You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/10/06 22:24:48 UTC

[GitHub] [geode] alb3rtobr opened a new pull request #5600: GEODE-8202: Two-step serial gw sender threads start

alb3rtobr opened a new pull request #5600:
URL: https://github.com/apache/geode/pull/5600


   RFC: https://cwiki.apache.org/confluence/display/GEODE/New+option+for+serial+gw+sender+dispatcher+threads+start
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] onichols-pivotal edited a comment on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
onichols-pivotal edited a comment on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-759826939


   PR checks are failing to report status due to a rebase conflict.  Please rebase with latest develop to get unstuck (it may be less painful if you squash first).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-704805752


   @davebarnes97 you are right, I agree: using something like `--enforce-all-threads-same-receiver` is more appropriate to indicate what that option is doing,  while the option I proposed indicates the "reason" for a different behavior on the thread start up process.
   What about `--enforce-threads-connect-same-receiver`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-705018026


   > @davebarnes97 you are right, I agree: using something like `--enforce-all-threads-same-receiver` is more appropriate to indicate what that option is doing, while the option I proposed indicates the "reason" for a different behavior on the thread start up process.
   > What about `--enforce-threads-connect-same-receiver`?
   
   Thanks, Alberto -- that would satisfy my concern. Who else can we get to corroborate this change? I've added Barry as a reviewer in hopes that he'll render an opinion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 edited a comment on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 edited a comment on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-705557152


   @alb3rtobr Thank you for improving the parameter name. With that in place, here is a suggested doc edit. I'm using present tense, rather than future tense, where possible, and changing some singular/plural to comply with common usage:
   
   
   <td>This parameter applies only to serial gateway senders. If true, receiver member id is checked by all dispatcher threads when the connection is established to ensure they connect to the same receiver. Instead of starting all dispatcher threads in parallel, one thread is started first, and after that the rest are started in parallel.</td>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-705557152


   @alb3rtobr Thank you for improving the parameter name. With that in place, here is a suggested doc edit. I'm using present tense, rather than future tense, where possible, and changing some singular/plural to comply with common usage.
   
   <td>This parameter applies only to serial gateway senders. If true, receiver member id is checked by all dispatcher threads when the connection is established to ensure they connect to the same receiver. Instead of starting all dispatcher threads in parallel, one thread is started first, and after that the rest are started in parallel.</td>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-759838737


   > PR checks are failing to report status due to a rebase conflict. Please rebase with latest develop to get unstuck (it may be less painful if you squash first).
   
   Thanks @onichols-pivotal . It would be a pitty if all the valuable comments in this PR were lost, so Im going to close this PR and I will open a new one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r555699186



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
    */
   private int batchSize;
 
+  private String expectedReceiverUniqueId = "";
+
+  private boolean enforceThreadsConnectSameReceiver = false;
+

Review comment:
       Good catch! Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r534079046



##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {

Review comment:
       I read the article when you posted it the first time (interesting reading, thanks!) and tried to follow the `UnitOfWork_StateUnderTest_ExpectedBehavior` schema they propose (which btw I found it very similar to the `given-when-then` naming approach). It seems I did not understand the article in the same way as you, sorry. I used `enforceThreadsConnectSameReceiver` as `UnitOfWork` because this is what this PR is introducing.
   
   The names you have proposed look fine for me, they really look like requirements when you read them. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r533812068



##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToFalse_retriesAreNotUsed() {

Review comment:
       This test name might be better as "intializeConnectionWithSerialSenderAndEnforceThreadsConnectSameRecieverFalseDoesNotRetryInitializeConnection".

##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToFalse_retriesAreNotUsed() {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_firstThreadObtainsTheReceiverId() {

Review comment:
       This test might be better named "initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndNoExpectedReceiverSetsReceiverIdAndDoesNotReacquireConnection".

##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToFalse_retriesAreNotUsed() {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_firstThreadObtainsTheReceiverId() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(1)).setExpectedReceiverUniqueId("receiverId");
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithNoRetry() {

Review comment:
       This test might be better named "initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverDoesNotReacquireConnection".

##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {

Review comment:
       These test names are still rather confusing in terms of what the test is doing and expecting to see. As per [the blog post on good test naming](https://osherove.com/blog/2005/4/3/naming-standards-for-unit-tests.html) I linked previously, a test name should ideally be of the form: (what work the test is doing)\_(what the conditions are)\_(what the expected result is), so for this test, a good name might be "initializeConnectionWithParalellSenderDoesNotRetryInitializeConnection".

##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToFalse_retriesAreNotUsed() {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_firstThreadObtainsTheReceiverId() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(1)).setExpectedReceiverUniqueId("receiverId");
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithNoRetry() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithRetry() {

Review comment:
       This test might be better named "initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverOnDecondTryReacquiresConnectionOnce".

##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToFalse_retriesAreNotUsed() {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_firstThreadObtainsTheReceiverId() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(1)).setExpectedReceiverUniqueId("receiverId");
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithNoRetry() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithRetry() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId").thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(2)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_exceptionThrownWhenMaxRetriesReachedAndNoServersAvailable() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+    String expectedExceptionMessage =
+        "There are no active servers. "
+            + GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [expectedId] (5 attempts)";
+    assertThatThrownBy(() -> {
+      dispatcherSpy.initializeConnection();
+    }).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(5)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_exceptionThrownWhenMaxRetriesReachedButServersAreAvailable() {

Review comment:
       This test might be better named "initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndServersAvailableThrowsException".

##########
File path: geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
##########
@@ -77,4 +132,160 @@ public void getConnectionShouldCreateNewConnectionWhenServerIsNull() {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializeParallelSenderConnection_retriesAreNotUsed() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToFalse_retriesAreNotUsed() {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_firstThreadObtainsTheReceiverId() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(1)).setExpectedReceiverUniqueId("receiverId");
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithNoRetry() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_threadConnectsToExpectedReceiverWithRetry() {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId").thenReturn("expectedId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(2)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+
+  }
+
+  @Test
+  public void enforceThreadsConnectSameReceiver_initializingConnectionOfSerialSenderWithOptionSetToTrue_exceptionThrownWhenMaxRetriesReachedAndNoServersAvailable() {

Review comment:
       This test might be better named "initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndNoServersAvailableThrowsException".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
boglesby commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-755014843


   Here is a another diff with the versioning changes to GatewaySenderAdvisor:  
   ```
   diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
   index adf80cb1e5..2390e7241e 100644
   --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
   +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
   @@ -232,6 +232,16 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
                  "Cannot create Gateway Sender %s with isDiskSynchronous %s because another cache has the same Gateway Sender defined with isDiskSynchronous %s",
                  sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous()));
        }
   +
   +    if (sp.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0)) {
   +      if (sp.enforceThreadsConnectSameReceiver != sender.getEnforceThreadsConnectSameReceiver()) {
   +        throw new IllegalStateException(
   +            String.format(
   +                "Cannot create Gateway Sender %s with enforceThreadsConnectSameReceiver %s because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver %s",
   +                sp.Id, sp.enforceThreadsConnectSameReceiver,
   +                sender.getEnforceThreadsConnectSameReceiver()));
   +      }
   +    }
      }
    
      /**
   @@ -532,6 +542,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
    
        public ServerLocation serverLocation;
    
   +    public boolean enforceThreadsConnectSameReceiver = false;
   +
        public GatewaySenderProfile(InternalDistributedMember memberId, int version) {
          super(memberId, version);
        }
   @@ -541,6 +553,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
        @Override
        public void fromData(DataInput in,
            DeserializationContext context) throws IOException, ClassNotFoundException {
   +      fromDataPre_GEODE_1_14_0_0(in, context);
   +      this.enforceThreadsConnectSameReceiver = in.readBoolean();
   +    }
   +
   +    public void fromDataPre_GEODE_1_14_0_0(DataInput in,
   +        DeserializationContext context) throws IOException, ClassNotFoundException {
          super.fromData(in, context);
          this.Id = DataSerializer.readString(in);
          this.startTime = in.readLong();
   @@ -583,6 +601,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
        @Override
        public void toData(DataOutput out,
            SerializationContext context) throws IOException {
   +      toDataPre_GEODE_1_14_0_0(out, context);
   +      out.writeBoolean(enforceThreadsConnectSameReceiver);
   +    }
   +
   +    public void toDataPre_GEODE_1_14_0_0(DataOutput out,
   +        SerializationContext context) throws IOException {
          super.toData(out, context);
          DataSerializer.writeString(Id, out);
          out.writeLong(startTime);
   @@ -684,7 +708,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
    
        @Immutable
        private static final KnownVersion[] serializationVersions =
   -        new KnownVersion[] {KnownVersion.GFE_80};
   +        new KnownVersion[] {KnownVersion.GFE_80, KnownVersion.GEODE_1_14_0};
    
        @Override
        public KnownVersion[] getSerializationVersions() {
   diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   index 7c93836667..1b5206b532 100644
   --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   @@ -230,6 +230,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
        pf.dispatcherThreads = getDispatcherThreads();
        pf.orderPolicy = getOrderPolicy();
        pf.serverLocation = this.getServerLocation();
   +    pf.enforceThreadsConnectSameReceiver = getEnforceThreadsConnectSameReceiver();
      }
    
      @Override
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r533779942



##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -472,6 +516,12 @@ private GatewaySenderException getInitializeConnectionExceptionToThrow(
                 "No available connection was found, but the following active servers exist: %s",
                 buffer.toString());
       }
+      if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+        if (Pattern.compile("Cannot get connection to .* after .* attempts.")

Review comment:
       I have added a commit that extracts the exception message to a variable, this will avoid breaking the check if the text is changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r533469342



##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -348,11 +349,56 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+    boolean connectedToExpectedReceiver = connectedServerId.equals(expectedServerId);
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to endpoint " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      connectedToExpectedReceiver = true;
+    }
+    int attempt = 0;
+    final int maxAttempts = 5;

Review comment:
       If several receivers are sharing the same ip and port, the connection will be assigned randomly by an external proxy or load balancer. Its true that using this fix value is not the best option... If for example we have 5 receivers, we could spend all the connection retries without reaching the desired receiver.
   Maybe I could increment the value of `maxAttempt` when a connection to a wrong receiver is obtained and its the first time we reach that wrong receiver. This will cause that `maxAttempt` value is dependent on the number of receivers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr closed pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr closed pull request #5600:
URL: https://github.com/apache/geode/pull/5600


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-718565202


   Im moving this PR to draft due to I will not be able to work on it for several weeks, sorry for that


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 edited a comment on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 edited a comment on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-705018026


   > @davebarnes97 you are right, I agree: using something like `--enforce-all-threads-same-receiver` is more appropriate to indicate what that option is doing, while the option I proposed indicates the "reason" for a different behavior on the thread start up process.
   > What about `--enforce-threads-connect-same-receiver`?
   
   Thanks, Alberto -- that would satisfy my concern. Who else can we get to corroborate this change? I've added @boglesby  as a reviewer in hopes that he'll render an opinion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-704612068


   @alb3rtobr Thank you for your contribution. I read the documentation you provided in `geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb` and I'm working on a few grammatical changes. But before I submit them, I would like to better understand the name of the property you propose, `--receivers-sharing-ip-and-port`, and the part of the explanation that says "Sets whether or not the receivers are sharing the same ip and port."
   If I understand the RFC correctly, the general context is an environment in which multiple receivers specify a duplicate ip:port combination (not exactly 'sharing'), and the purpose of the property is to say to gfsh, "When you create this sender, be sure that all of its dispatcher threads connect to the same receiver."
   Also, it sounds like this option would work (though it would be unnecessary) if there was only one receiver up and running.
   So I think this property needs a more precise name. Something like `--enforce-all-threads-same-receiver` or something like that.
   [This became very long. I've contributed it as a comment on the RFC.]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 edited a comment on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 edited a comment on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-705557152


   @alb3rtobr Thank you for improving the parameter name. With that in place, here is a suggested doc edit. I'm using present tense, rather than future tense, where possible, and changing some singular/plural to comply with common usage:
   
   
   <td>This parameter applies only to serial gateway senders. If true, receiver member id is checked by all dispatcher threads when the connection is established to ensure they connect to the same receiver. Instead of starting all dispatcher threads in parallel, one thread is started first, and after that the rest are started in parallel.</td>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
boglesby commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-754953043


   I'm still reviewing these changes, but here are a few things I see.
   
   I can run a test like:
   
   1. Start server 1 with a gateway sender with enforce-threads-connect-same-receiver=true
   2. Start server 1 with a gateway sender with enforce-threads-connect-same-receiver=false
   
   The servers successfully start up in this case, but I don't think they should with these conflicting property values.
   
   The gfsh commands look like:
   ```
   start server --name=server-A-1 --group=server-A-1 --locators=localhost[10331] --mcast-port=0 --server-port=0
   
   start server --name=server-A-2 --group=server-A-2 --locators=localhost[10331] --mcast-port=0 --server-port=0
   
   create gateway-sender --id=B --group=server-A-1 --remote-distributed-system-id=2 --enforce-threads-connect-same-receiver=true
   
   create gateway-sender --id=B --group=server-A-2 --remote-distributed-system-id=2 --enforce-threads-connect-same-receiver=false
   ```
   The generated xml looks like:
   
   server-A-1:
   ```
   <cache ...>
     <gateway-sender enforce-threads-connect-same-receiver="true" id="B" .../>
   </cache>
   ```
   server-A-2:
   ```
   <cache ...>
     <gateway-sender enforce-threads-connect-same-receiver="false" id="B" .../>
   </cache>
   ```
   Here is an example where conflicting gateway sender parameters cause the startup to fail:
   ```
   create gateway-sender --id=B --group=server-A-1 --remote-distributed-system-id=2 --parallel=true
   
   create gateway-sender --id=B --group=server-A-2 --remote-distributed-system-id=2 --parallel=false
   ```
   This error message is logged:
   ```
   Executing - create gateway-sender --id=B --group=server-A-2 --remote-distributed-system-id=2 --parallel=false
   
     Member   | Status | Message
   ---------- | ------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------
   server-A-2 | ERROR  |  java.lang.IllegalStateException: Cannot create Gateway Sender B as serial gateway sender because another cache has the same sender as parallel gateway sender
   
   And this error message is in the logs:
   
   [error 2021/01/05 12:05:51.385 PST server-A-2 <Function Execution Processor2> tid=0x3f] Cannot create Gateway Sender B as serial gateway sender because another cache has the same sender as parallel gateway sender
   java.lang.IllegalStateException: Cannot create Gateway Sender B as serial gateway sender because another cache has the same sender as parallel gateway sender
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.checkCompatibility(GatewaySenderAdvisor.java:125)
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.profileCreated(GatewaySenderAdvisor.java:105)
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.distributed.internal.DistributionAdvisor.doPutProfile(DistributionAdvisor.java:625)
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.distributed.internal.DistributionAdvisor.putProfile(DistributionAdvisor.java:523)
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.distributed.internal.DistributionAdvisor.putProfile(DistributionAdvisor.java:518)
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.distributed.internal.DistributionAdvisor$Profile.handleDistributionAdvisee(DistributionAdvisor.java:1537)
   	at Remote Member '10.166.145.67(server-A-1:89588)<v1>:41001' in org.apache.geode.internal.cache.wan.GatewaySenderAdvisor$GatewaySenderProfile.processIncoming(GatewaySenderAdvisor.java:708)
   	at org.apache.geode.distributed.internal.ReplyException.handleCause(ReplyException.java:86)
   	at org.apache.geode.internal.cache.UpdateAttributesProcessor.waitForProfileResponse(UpdateAttributesProcessor.java:113)
   	at org.apache.geode.internal.cache.UpdateAttributesProcessor.distribute(UpdateAttributesProcessor.java:99)
   	at org.apache.geode.internal.cache.GemFireCacheImpl.addGatewaySender(GemFireCacheImpl.java:3732)
   	at org.apache.geode.internal.cache.wan.GatewaySenderFactoryImpl.create(GatewaySenderFactoryImpl.java:300)
   	at org.apache.geode.management.internal.cli.functions.GatewaySenderCreateFunction.createGatewaySender(GatewaySenderCreateFunction.java:183)
   	at org.apache.geode.management.internal.cli.functions.GatewaySenderCreateFunction.execute(GatewaySenderCreateFunction.java:60)
   ```
   Something similar should happen with the enforce-threads-connect-same-receiver property.
   
   This check and error message are coming from GatewaySenderAdvisor.checkCompatibility.
   
   GatewaySenderProfile needs to be updated to include enforceThreadsConnectSameReceiver. Here are some changes that do this:
   ```
   diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
   index adf80cb1e5..dc70a0ae6b 100644
   --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
   +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
   @@ -232,6 +232,14 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
                  "Cannot create Gateway Sender %s with isDiskSynchronous %s because another cache has the same Gateway Sender defined with isDiskSynchronous %s",
                  sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous()));
        }
   +
   +    if (sp.enforceThreadsConnectSameReceiver != sender.getEnforceThreadsConnectSameReceiver()) {
   +      throw new IllegalStateException(
   +          String.format(
   +              "Cannot create Gateway Sender %s with enforceThreadsConnectSameReceiver %s because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver %s",
   +              sp.Id, sp.enforceThreadsConnectSameReceiver,
   +              sender.getEnforceThreadsConnectSameReceiver()));
   +    }
      }
    
      /**
   @@ -532,6 +540,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
    
        public ServerLocation serverLocation;
    
   +    public boolean enforceThreadsConnectSameReceiver = false;
   +
        public GatewaySenderProfile(InternalDistributedMember memberId, int version) {
          super(memberId, version);
        }
   @@ -578,6 +588,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
            this.serverLocation = new ServerLocation();
            InternalDataSerializer.invokeFromData(this.serverLocation, in);
          }
   +      this.enforceThreadsConnectSameReceiver = in.readBoolean();
        }
    
        @Override
   @@ -617,6 +628,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
          if (serverLocationFound) {
            InternalDataSerializer.invokeToData(serverLocation, out);
          }
   +      out.writeBoolean(enforceThreadsConnectSameReceiver);
        }
    
        public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext context)
   diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   index 7c93836667..1b5206b532 100644
   --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   @@ -230,6 +230,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
        pf.dispatcherThreads = getDispatcherThreads();
        pf.orderPolicy = getOrderPolicy();
        pf.serverLocation = this.getServerLocation();
   +    pf.enforceThreadsConnectSameReceiver = getEnforceThreadsConnectSameReceiver();
      }
    
      @Override
   ```
   With these changes, gfsh logs this error message:
   ```
   Executing - create gateway-sender --id=B --group=server-A-2 --remote-distributed-system-id=2 --enforce-threads-connect-same-receiver=false
   
     Member   | Status | Message
   ---------- | ------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
   server-A-2 | ERROR  |  java.lang.IllegalStateException: Cannot create Gateway Sender B with enforceThreadsConnectSameReceiver false because another cache has the same Gateway Sender defined with enforceThreadsConnectSameReceiver true
   ```
   There might be a problem with these changes as written though. gfsh prevents gateway senders from being created while all the members are not the current version, but thats not true for GemFire xml. 
   
   If I run a test like this with these changes:
   
   1. Start locator with an older version
   2. Start several servers with GemFire xml
   3. Bounce locator to a newer version
   4. Start server with the newer version
   
   In this case, I see an exception like this in the new server when it tries to deserialize a GatewaySenderProfile from an older member:
   ```
   [fatal 2021/01/05 14:01:51.543 PST server-A-new <P2P message reader for 10.166.145.67(server-A-2:97374)<v2>:41002(version:GEODE 1.13.0) shared unordered uid=12 local port=41102 remote port=63358> tid=0x2b] Error deserializing message
   java.nio.BufferUnderflowException
   	at java.nio.Buffer.nextGetIndex(Buffer.java:500)
   	at java.nio.DirectByteBuffer.get(DirectByteBuffer.java:249)
   	at org.apache.geode.internal.tcp.ByteBufferInputStream$ByteBufferByteSource.get(ByteBufferInputStream.java:206)
   	at org.apache.geode.internal.tcp.ByteBufferInputStream.readBoolean(ByteBufferInputStream.java:878)
   	at org.apache.geode.internal.cache.wan.GatewaySenderAdvisor$GatewaySenderProfile.fromData(GatewaySenderAdvisor.java:592)
   ```
   I'm not sure if this case needs to be supported. If so, then the GatewaySenderProfile toData/fromData will have to handle versioning.
   
   btw - this same change might need to be made for the groupTransactionEvents property.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r508871988



##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -348,11 +349,56 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+    boolean connectedToExpectedReceiver = connectedServerId.equals(expectedServerId);
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to endpoint " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      connectedToExpectedReceiver = true;
+    }
+    int attempt = 0;
+    final int maxAttempts = 5;

Review comment:
       When this method is invoked, do we expect that the first call to `acquireConnection()` should result in a connection to the desired server, or is there an element of randomness in which server we end up getting connected to, hence the retries? If it's the latter, this method could prove unusable for situations in which the chance of getting the correct server at random is low.

##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -472,6 +516,12 @@ private GatewaySenderException getInitializeConnectionExceptionToThrow(
                 "No available connection was found, but the following active servers exist: %s",
                 buffer.toString());
       }
+      if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+        if (Pattern.compile("Cannot get connection to .* after .* attempts.")

Review comment:
       This matching may fail if the exception message generated in `retryInitializeConnection()` is changed. Is there a way to prevent future changes to the code from breaking this check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r534093886



##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -348,11 +349,56 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+    boolean connectedToExpectedReceiver = connectedServerId.equals(expectedServerId);
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to endpoint " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      connectedToExpectedReceiver = true;
+    }
+    int attempt = 0;
+    final int maxAttempts = 5;

Review comment:
       With the approach I was suggesting, we will have `x * y ` attempts to connect to the desired receiver, being `x=5` (or any other number) and `y=<number of receivers under same ip and port>`.
   
   I think this is a high enough number to guarantee we reach the receiver we want. As you said, we are not setting the probability of not reaching the proper server to zero, but I think we are reducing it to a reasonable value.
   
   I will add a commit with the change.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-757963215


   Thanks for the detailed review @boglesby . I have created a test case to verify the changes you provided to avoid the creation of several senders with different value of `enforceThreadsConnectToSameReceiver`. Im moving this PR to draft until I finish to implement all your comments.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
boglesby commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r553566356



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
    */
   private int batchSize;
 
+  private String expectedReceiverUniqueId = "";

Review comment:
       I think the expectedReceiverUniqueId should be on AbstractGatewaySender (like serverLocation).
   

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
##########
@@ -146,6 +146,10 @@
    */
   private int batchSize;
 
+  private String expectedReceiverUniqueId = "";
+
+  private boolean enforceThreadsConnectSameReceiver = false;
+

Review comment:
       AbstractGatewaySenderEventProcessor defines the enforceThreadsConnectSameReceiver but it doesn't need to since AbstractGatewaySender already defines it and the processor has a reference to the sender. Removing this attribute will simplify some of this code.
   
   The changes to RemoteConcurrentSerialGatewaySenderEventProcessor and SerialGatewaySenderImpl would be eliminated.
   
   ConcurrentSerialGatewaySenderEventProcessor can be changed to reference the value in the sender (sender.getEnforceThreadsConnectSameReceiver()).
   
   Here is a diff with those changes:
   ```
   diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
   index 1fc160ebb0..3a20e3020b 100644
   --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
   +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
   @@ -148,8 +148,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
    
      private String expectedReceiverUniqueId = "";
    
   -  private boolean enforceThreadsConnectSameReceiver = false;
   -
      public AbstractGatewaySenderEventProcessor(String string,
          GatewaySender sender, ThreadsMonitoring tMonitoring) {
        super(string);
   @@ -158,13 +156,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
        this.threadMonitoring = tMonitoring;
      }
    
   -  public AbstractGatewaySenderEventProcessor(String string,
   -      GatewaySender sender, ThreadsMonitoring tMonitoring,
   -      boolean enforceThreadsConnectSameReceiver) {
   -    this(string, sender, tMonitoring);
   -    this.enforceThreadsConnectSameReceiver = enforceThreadsConnectSameReceiver;
   -  }
   -
      public void setExpectedReceiverUniqueId(String uniqueId) {
        this.expectedReceiverUniqueId = uniqueId;
      }
   @@ -173,14 +164,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread
        return this.expectedReceiverUniqueId;
      }
    
   -  public void setEnforceThreadsConnectSameReceiver(boolean value) {
   -    this.enforceThreadsConnectSameReceiver = value;
   -  }
   -
   -  public boolean getEnforceThreadsConnectSameReceiver() {
   -    return this.enforceThreadsConnectSameReceiver;
   -  }
   -
      public Object getRunningStateLock() {
        return runningStateLock;
      }
   diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
   index 9cf7487a40..7adf99640f 100644
   --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
   +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
   @@ -77,20 +77,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor
        }
      }
    
   -  public ConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
   -      ThreadsMonitoring tMonitoring, boolean cleanQueues,
   -      boolean enforceThreadsConnectSameReceiver) {
   -    super("Event Processor for GatewaySender_" + sender.getId(), sender, tMonitoring,
   -        enforceThreadsConnectSameReceiver);
   -    this.sender = sender;
   -
   -    initializeMessageQueue(sender.getId(), cleanQueues);
   -    queues = new HashSet<RegionQueue>();
   -    for (SerialGatewaySenderEventProcessor processor : processors) {
   -      queues.add(processor.getQueue());
   -    }
   -  }
   -
      @Override
      public int getTotalQueueSize() {
        int totalSize = 0;
   @@ -194,7 +180,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
      @Override
      public void run() {
        boolean isDebugEnabled = logger.isDebugEnabled();
   -    if (getEnforceThreadsConnectSameReceiver()) {
   +    if (this.sender.getEnforceThreadsConnectSameReceiver()) {
          this.processors.get(0).start();
          waitForRunningStatus(this.processors.get(0));
          String receiverUniqueId = this.processors.get(0).getExpectedReceiverUniqueId();
   @@ -206,7 +192,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor
          }
        }
    
   -    for (int i = getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors.size(); i++) {
   +    for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i < this.processors
   +        .size(); i++) {
          if (isDebugEnabled) {
            logger.debug("Starting the serialProcessor {}", i);
          }
   diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
   index 306a2a4937..7139307d7f 100644
   --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
   +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteConcurrentSerialGatewaySenderEventProcessor.java
   @@ -30,20 +30,11 @@ public class RemoteConcurrentSerialGatewaySenderEventProcessor
        super(sender, tMonitoring, cleanQueues);
      }
    
   -  public RemoteConcurrentSerialGatewaySenderEventProcessor(AbstractGatewaySender sender,
   -      ThreadsMonitoring tMonitoring, boolean cleanQueues,
   -      boolean enforceThreadsConnectSameReceiver) {
   -    super(sender, tMonitoring, cleanQueues, enforceThreadsConnectSameReceiver);
   -  }
   -
      @Override
      protected void initializeMessageQueue(String id, boolean cleanQueues) {
        for (int i = 0; i < sender.getDispatcherThreads(); i++) {
   -      SerialGatewaySenderEventProcessor processor =
   -          new RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." + i,
   -              getThreadMonitorObj(), cleanQueues);
   -      processor.setEnforceThreadsConnectSameReceiver(getEnforceThreadsConnectSameReceiver());
   -      processors.add(processor);
   +      processors.add(new RemoteSerialGatewaySenderEventProcessor(this.sender, id + "." + i,
   +          getThreadMonitorObj(), cleanQueues));
          if (logger.isDebugEnabled()) {
            logger.debug("Created the RemoteSerialGatewayEventProcessor_{}->{}", i, processors.get(i));
          }
   diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   index 7c93836667..3474b4a3c5 100644
   --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
   @@ -120,9 +120,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
        AbstractGatewaySenderEventProcessor eventProcessor;
        if (getDispatcherThreads() > 1) {
          eventProcessor = new RemoteConcurrentSerialGatewaySenderEventProcessor(
   -          SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues,
   -          enforceThreadsConnectSameReceiver);
   -      // eventProcessor.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
   +          SerialGatewaySenderImpl.this, getThreadMonitorObj(), cleanQueues);
        } else {
          eventProcessor = new RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this,
              getId(), getThreadMonitorObj(), cleanQueues);
   ```

##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to server " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      return con;
+    }
+
+    int attempt = 0;
+    final int attemptsPerServer = 5;

Review comment:
       Should attemptsPerServer be configurable?
   

##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();

Review comment:
       The server variable is unused




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] davebarnes97 commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
davebarnes97 commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-705557152


   @alb3rtobr Thank you for improving the parameter name. With that in place, here is a suggested doc edit. I'm using present tense, rather than future tense, where possible, and changing some singular/plural to comply with common usage.
   
   <td>This parameter applies only to serial gateway senders. If true, receiver member id is checked by all dispatcher threads when the connection is established to ensure they connect to the same receiver. Instead of starting all dispatcher threads in parallel, one thread is started first, and after that the rest are started in parallel.</td>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] onichols-pivotal commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
onichols-pivotal commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-759826939


   PR checks are failing to report status due to a rebase conflict.  Please rebase with latest develop to get unstuck.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r533809728



##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -348,11 +349,56 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+    boolean connectedToExpectedReceiver = connectedServerId.equals(expectedServerId);
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to endpoint " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      connectedToExpectedReceiver = true;
+    }
+    int attempt = 0;
+    final int maxAttempts = 5;

Review comment:
       I feel a little uneasy about the idea of something that could fail at random just due to bad luck. There's a non-zero (albeit small) chance that we would never get the "right" receiver using this approach, and the number of retries necessary to ensure we have some minimum probability of getting the right receiver increases rapidly with each receiver that's sharing the same ip and port.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-760753273


   Hi @boglesby , you can check my changes after your review here: https://github.com/apache/geode/pull/5900
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on a change in pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on a change in pull request #5600:
URL: https://github.com/apache/geode/pull/5600#discussion_r555049292



##########
File path: geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
##########
@@ -362,11 +366,71 @@ public void destroyConnection() {
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ServerLocation server = this.sender.getServerLocation();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to server " + connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      return con;
+    }
+
+    int attempt = 0;
+    final int attemptsPerServer = 5;

Review comment:
       Take into account that this value is per server: if we have three receivers sharing the same ip and port, we will have 15 attempts. For me its sounds like a reasonable number of attempts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-747681619


   > there is a lack of DUnit testing to verify this new feature is working as intended, which would be good to have.
   
   I have added a new test case to verify the feature. It is included in `SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java`, which setup two clusters with this configuration, so I think it was the right place.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] alb3rtobr commented on pull request #5600: GEODE-8202: Two-step serial gw sender threads start

Posted by GitBox <gi...@apache.org>.
alb3rtobr commented on pull request #5600:
URL: https://github.com/apache/geode/pull/5600#issuecomment-758526673


   > btw - this same change might need to be made for the groupTransactionEvents property.
   
   I will create a separate ticket and PR for this change, as it is not related with the feature Im adding in this ticket.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org