You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/12/17 21:50:54 UTC

[geode] branch develop updated (deb8d1e -> 213e07f)

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from deb8d1e  GEODE-6209 Remove anchor/links for cache.xml subsections in docs (#3009)
     new ea62d29  GEODE-6177: WAN Event processing continues after authentication fails
     new 10668f5  GEODE-6177: wait for queue processing in test
     new 14fee9a  GEODE-6177: Spotless
     new 213e07f  GEODE-6177: make test hook visible across threads

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/pdx/PdxRegistryMismatchException.java    |   2 +-
 .../wan/misc/NewWanAuthenticationDUnitTest.java    | 637 ++++++++++++---------
 .../wan/GatewaySenderEventRemoteDispatcher.java    | 145 +++--
 3 files changed, 454 insertions(+), 330 deletions(-)


[geode] 03/04: GEODE-6177: Spotless

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 14fee9aaa1e1fe82cc0adfce98df29521279e981
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Fri Dec 14 16:51:52 2018 -0800

    GEODE-6177: Spotless
---
 .../cache/wan/misc/NewWanAuthenticationDUnitTest.java       | 13 ++++++++-----
 .../cache/wan/GatewaySenderEventRemoteDispatcher.java       |  3 ++-
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
index c4cb5f2..48d5be9 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
@@ -407,10 +407,12 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     // sender.invoke(() -> verifyDifferentServerInGetCredentialCall());
   }
 
-  private void doPutsAndVerifyQueueSizeAfterProcessing(final String regionName, final int numPuts,
-                                                       final boolean shouldBeConnected,
-                                                       final boolean isQueueBlocked,
-                                                       final boolean isAckThreadRunning) {
+  private void doPutsAndVerifyQueueSizeAfterProcessing(
+      final String regionName,
+      final int numPuts,
+      final boolean shouldBeConnected,
+      final boolean isQueueBlocked,
+      final boolean isAckThreadRunning) {
     if (isQueueBlocked) {
       // caller is assuming that queue processing will not make progress
       try {
@@ -442,7 +444,8 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
 
         checkQueueSize(senderId, numPuts);
       } finally {
-        GatewaySenderEventRemoteDispatcher.messageProcessingAttempted = isAck -> {};
+        GatewaySenderEventRemoteDispatcher.messageProcessingAttempted = isAck -> {
+        };
       }
     } else {
       doPuts(regionName, numPuts);
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index eda2865..3b3318f 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -67,7 +67,8 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    * Without this hook, negative tests, can't ensure that message processing was
    * attempted, so they wouldn't know how long to wait for some sort of failure.
    */
-  public static Consumer<Boolean> messageProcessingAttempted = isAck -> {};
+  public static Consumer<Boolean> messageProcessingAttempted = isAck -> {
+  };
 
   /**
    * This count is reset to 0 each time a successful connection is made.


[geode] 04/04: GEODE-6177: make test hook visible across threads

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 213e07ff7910d33073e9d933ded252cc9fc0b881
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Mon Dec 17 09:33:57 2018 -0800

    GEODE-6177: make test hook visible across threads
---
 .../geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3b3318f..a0b714d 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -67,7 +67,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    * Without this hook, negative tests, can't ensure that message processing was
    * attempted, so they wouldn't know how long to wait for some sort of failure.
    */
-  public static Consumer<Boolean> messageProcessingAttempted = isAck -> {
+  public static volatile Consumer<Boolean> messageProcessingAttempted = isAck -> {
   };
 
   /**


[geode] 02/04: GEODE-6177: wait for queue processing in test

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 10668f55003176efc3695e337fade94c0e554fac
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Fri Dec 14 16:02:30 2018 -0800

    GEODE-6177: wait for queue processing in test
---
 .../wan/misc/NewWanAuthenticationDUnitTest.java    | 79 +++++++++++++++-------
 .../wan/GatewaySenderEventRemoteDispatcher.java    | 26 ++++---
 2 files changed, 73 insertions(+), 32 deletions(-)

diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
index e31acbb..c4cb5f2 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
@@ -25,6 +25,7 @@ import static org.apache.geode.test.dunit.Assert.assertNotNull;
 import static org.apache.geode.test.dunit.Assert.assertTrue;
 
 import java.util.Properties;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.logging.log4j.Logger;
 import org.junit.Before;
@@ -37,6 +38,7 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.security.AuthInitialize;
@@ -208,7 +210,7 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
 
     sender.invoke(() -> {
       startSender(senderId);
-      doPutsAndVerifyQueue(regionName, numPuts, false, numPuts);
+      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, false, true, false);
     });
 
     receiver.invoke(() -> validateRegionSize(regionName, 0));
@@ -244,7 +246,7 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
 
     sender.invoke(() -> {
       startSender(senderId);
-      doPutsAndVerifyQueue(regionName, numPuts, false, numPuts);
+      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, false, true, false);
     });
 
     receiver.invoke(() -> validateRegionSize(regionName, 0));
@@ -257,7 +259,7 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     });
 
     sender.invoke(() -> {
-      doPutsAndVerifyQueue(regionName, numPuts, true, 0);
+      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, true, false, true);
     });
 
     receiver.invoke(() -> validateRegionSize(regionName, numPuts));
@@ -281,17 +283,7 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     sender.invoke(() -> {
       createSecuredCache(senderSecurityProps, null, lnPort);
       createReplicatedRegion(regionName, senderId, isOffHeap());
-      try {
-        /*
-         * Setting the gateway-connection-retry-interval to 0 ensures that the Dispatcher and
-         * AckReader eagerly retry connections, so we may detect any issues with the retry logic
-         * early.
-         */
-        System.setProperty(gatewayConnectionRetryIntervalConfigParameter, "0");
-        createSender(senderId, 2, false, 100, 10, false, false, null, true);
-      } finally {
-        System.clearProperty(gatewayConnectionRetryIntervalConfigParameter);
-      }
+      createSender(senderId, 2, false, 100, 10, false, false, null, true);
     });
 
     receiver.invoke(() -> {
@@ -300,7 +292,7 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
 
     sender.invoke(() -> {
       startSender(senderId);
-      doPutsAndVerifyQueue(regionName, numPuts, true, 0);
+      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, true, false, true);
     });
 
     receiver.invoke(() -> validateRegionSize(regionName, numPuts));
@@ -315,7 +307,7 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     });
 
     sender.invoke(() -> {
-      doPutsAndVerifyQueue(regionName, numPuts, false, numPuts);
+      doPutsAndVerifyQueueSizeAfterProcessing(regionName, numPuts, false, true, true);
     });
 
     receiver.invoke(() -> validateRegionSize(regionName, 0));
@@ -329,8 +321,12 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     });
 
     sender.invoke(() -> {
-      // Data should be able to flow properly after valid credentials have been restored.
-      doPutsAndVerifyQueue(regionName, numPuts, true, 0);
+      /*
+       * Data should be able to flow properly after valid credentials have been restored.
+       * No more puts are needed because we already have numPuts queued from when credentials
+       * were invalid (see above).
+       */
+      doPutsAndVerifyQueueSizeAfterProcessing(regionName, 0, true, false, true);
     });
 
     receiver.invoke(() -> validateRegionSize(regionName, numPuts));
@@ -411,12 +407,49 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     // sender.invoke(() -> verifyDifferentServerInGetCredentialCall());
   }
 
-  private void doPutsAndVerifyQueue(final String regionName, final int numPuts,
-      final boolean shouldBeConnected,
-      final int expectedQueueSize) {
-    doPuts(regionName, numPuts);
+  private void doPutsAndVerifyQueueSizeAfterProcessing(final String regionName, final int numPuts,
+                                                       final boolean shouldBeConnected,
+                                                       final boolean isQueueBlocked,
+                                                       final boolean isAckThreadRunning) {
+    if (isQueueBlocked) {
+      // caller is assuming that queue processing will not make progress
+      try {
+        final LongAdder dispatchAttempts = new LongAdder();
+        final LongAdder ackReadAttempts = new LongAdder();
+
+        GatewaySenderEventRemoteDispatcher.messageProcessingAttempted = isAck -> {
+          if (isAck)
+            ackReadAttempts.increment();
+          else
+            dispatchAttempts.increment();
+        };
+
+        doPuts(regionName, numPuts);
+
+        /*
+         * The game here is to ensure that both the dispatcher thread and the ack reader thread
+         * each get at least one whack at processing (a batch or an ack, respectively).
+         * Note: both those conditions aren't obviously necessary from our method signature, but
+         * trust us: callers rely on that guarantee! (the "Processing" in the method name
+         * implies that _both_ threads tried).
+         *
+         * Notice the particular awfulness of the second term in the conditional below. Callers
+         * have to send us a flag to tell us that the ack reader thread is not running so we know
+         * not to look for its attempts.
+         */
+        await().until(() -> dispatchAttempts.sum() > 0 &&
+            (!isAckThreadRunning || ackReadAttempts.sum() > 0));
+
+        checkQueueSize(senderId, numPuts);
+      } finally {
+        GatewaySenderEventRemoteDispatcher.messageProcessingAttempted = isAck -> {};
+      }
+    } else {
+      doPuts(regionName, numPuts);
+      // caller is assuming queue will drain eventually
+      checkQueueSize(senderId, 0);
+    }
     verifyRunningWithConnectedState(senderId, shouldBeConnected);
-    checkQueueSize(senderId, expectedQueueSize);
   }
 
   private void createSecuredReceiver(Integer nyPort, String regionName,
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 6951e32..eda2865 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -19,6 +19,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 import org.apache.logging.log4j.Logger;
 
@@ -60,6 +61,14 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
 
   private ReentrantReadWriteLock connectionLifeCycleLock = new ReentrantReadWriteLock();
 
+  /*
+   * Called after each attempt at processing an outbound (dispatch) or inbound (ack)
+   * message, whether the attempt is successful or not. The purpose is testability.
+   * Without this hook, negative tests, can't ensure that message processing was
+   * attempted, so they wouldn't know how long to wait for some sort of failure.
+   */
+  public static Consumer<Boolean> messageProcessingAttempted = isAck -> {};
+
   /**
    * This count is reset to 0 each time a successful connection is made.
    */
@@ -129,7 +138,10 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       } else {
         logAndStopProcessor(ex);
       }
+    } finally {
+      messageProcessingAttempted.accept(true);
     }
+
     return ack;
   }
 
@@ -163,6 +175,8 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       throw e;
     } catch (Exception e) {
       logAndStopProcessor(e);
+    } finally {
+      messageProcessingAttempted.accept(false);
     }
     return success;
   }
@@ -821,11 +835,8 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
 
   private static class RecoverableExceptionPredicates {
 
-    /**
-     * Certain exception types are considered recoverable when reading an acknowledgement.
-     */
     static boolean isRecoverableWhenReadingAck(final Exception ex) {
-      /**
+      /*
        * It is considered non-recoverable if the PDX registry files are deleted from the sending
        * side of a WAN Gateway. This is determined by checking if the cause of the
        * {@link ServerConnectivityException} is caused by a {@link PdxRegistryMismatchException}
@@ -835,11 +846,8 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
               && !(ex.getCause() instanceof PdxRegistryMismatchException));
     }
 
-    /**
-     * Certain exception types are considered recoverable when dispatching a batch.
-     */
     static boolean isRecoverableWhenDispatchingBatch(final Throwable t) {
-      /**
+      /*
        * We consider {@link ServerConnectivityException} to be a temporary connectivity issue and
        * is therefore recoverable. The {@link IllegalStateException} can occur if off-heap is used,
        * and a GatewaySenderEventImpl is serialized after being freed. This can happen if the
@@ -855,7 +863,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
      * reading an acknowledgement.
      */
     private static boolean isRecoverableInAllCases(final Throwable t) {
-      /**
+      /*
        * {@link IOException} and {@link ConnectionDestroyedException} can occur
        * due to temporary network issues and therefore are recoverable.
        * {@link GemFireSecurityException} represents an inability to authenticate with the


[geode] 01/04: GEODE-6177: WAN Event processing continues after authentication fails

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit ea62d29cf3307cbd175c018f5a1ead313f511eed
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Mon Dec 10 16:02:30 2018 -0800

    GEODE-6177: WAN Event processing continues after authentication fails
    
    Due to different handling in the GatewaySenderEventRemoteDispatcher
    dispatcher and ack reader threads, it was possible for event processing
    to stop when a GemFireSecurityException was encountered by the ack
    reader connection retry logic.
    
    This commit attempts to share common recoverable cases between the ack
    reader and dispatcher, while maintaining the cases which are specific to
    each.  We also added a test which ensures that if a connection is denied
    due to invalid credentials upon a restart of the receiver, that the
    sender can recover if it again provides valid credentials.  In the process,
    we removed a significant amount of duplicated and noisy code in the
    NewWanAuthenticationDUnitTests.
    
    Co-authored-by: Bill Burcham <bb...@pivotal.io>
    Co-authored-by: Ryan McMahon <rm...@pivotal.io>
---
 .../geode/pdx/PdxRegistryMismatchException.java    |   2 +-
 .../wan/misc/NewWanAuthenticationDUnitTest.java    | 601 +++++++++++----------
 .../wan/GatewaySenderEventRemoteDispatcher.java    | 136 +++--
 3 files changed, 409 insertions(+), 330 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/pdx/PdxRegistryMismatchException.java b/geode-core/src/main/java/org/apache/geode/pdx/PdxRegistryMismatchException.java
index 1317a32..e71695f 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/PdxRegistryMismatchException.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/PdxRegistryMismatchException.java
@@ -17,7 +17,7 @@ package org.apache.geode.pdx;
 import org.apache.geode.GemFireException;
 
 /**
- * Thrown when a an attempt is made to reuse a PDX Type. This can occur if the PDX registry files
+ * Thrown when an attempt is made to reuse a PDX Type. This can occur if the PDX registry files
  * are deleted from the sending side of a WAN Gateway.
  */
 public class PdxRegistryMismatchException extends GemFireException {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
index 43c82aa..e31acbb 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
@@ -27,6 +27,7 @@ import static org.apache.geode.test.dunit.Assert.assertTrue;
 import java.util.Properties;
 
 import org.apache.logging.log4j.Logger;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -34,6 +35,7 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.internal.logging.LogService;
@@ -44,14 +46,34 @@ import org.apache.geode.security.TestSecurityManager;
 import org.apache.geode.security.generator.CredentialGenerator;
 import org.apache.geode.security.generator.DummyCredentialGenerator;
 import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.WanTest;
 
 @Category({WanTest.class})
 public class NewWanAuthenticationDUnitTest extends WANTestBase {
 
-  public static final Logger logger = LogService.getLogger();
+  private static final Logger logger = LogService.getLogger();
 
-  public static boolean isDifferentServerInGetCredentialCall = false;
+  private static boolean isDifferentServerInGetCredentialCall = false;
+
+  private static final String securityJsonResource =
+      "org/apache/geode/security/templates/security.json";
+  private static final String senderId = "ln";
+  private static final int numPuts = 10;
+
+  private Integer lnPort;
+  private Integer nyPort;
+  private String regionName;
+  private final VM sender = vm2;
+  private final VM receiver = vm3;
+
+  @Before
+  public void setup() {
+    disconnectAllFromDS();
+    lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+    regionName = getTestMethodName() + "_RR";
+  }
 
   /**
    * Authentication test for new WAN with valid credentials. Although, nothing related to
@@ -60,112 +82,84 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
    */
   @Test
   public void testWanAuthValidCredentials() {
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    logger.info("Created locator on local site");
-
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-    logger.info("Created locator on remote site");
-
-
-    CredentialGenerator gen = new DummyCredentialGenerator();
-    Properties extraProps = gen.getSystemProperties();
+    final CredentialGenerator credentialGenerator = new DummyCredentialGenerator();
 
-    String clientauthenticator = gen.getAuthenticator();
-    String clientauthInit = gen.getAuthInit();
+    final Properties extraProps = credentialGenerator.getSystemProperties();
 
-    Properties credentials1 = gen.getValidCredentials(1);
+    final Properties senderCredentials = credentialGenerator.getValidCredentials(1);
     if (extraProps != null) {
-      credentials1.putAll(extraProps);
+      senderCredentials.putAll(extraProps);
     }
-    Properties javaProps1 = gen.getJavaProperties();
+    final Properties senderJavaProps = credentialGenerator.getJavaProperties();
 
-    // vm3's invalid credentials
-    Properties credentials2 = gen.getInvalidCredentials(1);
+    // receiver's invalid credentials
+    final Properties receiverCredentials = credentialGenerator.getInvalidCredentials(1);
     if (extraProps != null) {
-      credentials2.putAll(extraProps);
+      receiverCredentials.putAll(extraProps);
     }
-    Properties javaProps2 = gen.getJavaProperties();
+    final Properties receiverJavaProps = credentialGenerator.getJavaProperties();
 
-    Properties props1 =
-        buildProperties(clientauthenticator, clientauthInit, null, credentials1, null);
+    final String clientAuthenticator = credentialGenerator.getAuthenticator();
+    final String clientAuthInit = credentialGenerator.getAuthInit();
 
-    // have vm 3 start a cache with invalid credentails
-    Properties props2 =
-        buildProperties(clientauthenticator, clientauthInit, null, credentials2, null);
+    final Properties senderSecurityProps =
+        buildProperties(clientAuthenticator, clientAuthInit, null, senderCredentials, null);
+    // have receiver start a cache with invalid credentials
+    final Properties receiverSecurityProps =
+        buildProperties(clientAuthenticator, clientAuthInit, null, receiverCredentials, null);
 
-    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props1, javaProps1, lnPort));
-    logger.info("Created secured cache in vm2");
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
 
-    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, javaProps2, nyPort));
-    logger.info("Created secured cache in vm3");
+    sender.invoke(() -> createSecuredCache(senderSecurityProps, senderJavaProps, lnPort));
+    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, receiverJavaProps, nyPort));
 
-    vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
-    logger.info("Created sender in vm2");
+    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));
+    receiver.invoke(() -> createReceiverInSecuredCache());
 
-    vm3.invoke(() -> createReceiverInSecuredCache());
-    logger.info("Created receiver in vm3");
+    sender.invoke(
+        () -> createReplicatedRegion(regionName, "ln", isOffHeap()));
+    receiver.invoke(
+        () -> createReplicatedRegion(regionName, null, isOffHeap()));
 
-    vm2.invoke(
-        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
-    logger.info("Created RR in vm2");
-    vm3.invoke(
-        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
-    logger.info("Created RR in vm3");
+    // this tests verifies that even though the receiver has invalid credentials, the sender can
+    // still send data to
+    // the receiver because the sender has valid credentials
+    sender.invoke(() -> startSender("ln"));
+    sender.invoke(() -> waitForSenderRunningState("ln"));
 
-    // this tests verifies that even though vm3 has invalid credentials, vm2 can still send data to
-    // vm3 because
-    // vm2 has valid credentials
-    vm2.invoke(() -> WANTestBase.startSender("ln"));
-    vm2.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
-
-    vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
-    vm3.invoke(() -> {
-      Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
+    sender.invoke(() -> doPuts(regionName, 1));
+    receiver.invoke(() -> {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
       await().untilAsserted(() -> assertTrue(r.size() > 0));
     });
-    logger.info("Done successfully.");
   }
 
   @Test
   public void testWanIntegratedSecurityWithValidCredentials() {
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    logger.info("Created locator on local site");
-
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-    logger.info("Created locator on remote site");
-
-
-    Properties props1 = buildSecurityProperties("admin", "secret");
-    Properties props2 = buildSecurityProperties("guest", "guest");
+    final Properties senderSecurityProps = buildSecurityProperties("admin", "secret");
+    final Properties receiverSecurityProps = buildSecurityProperties("guest", "guest");
 
-    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props1, null, lnPort));
-    logger.info("Created secured cache in vm2");
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
 
-    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, null, nyPort));
-    logger.info("Created secured cache in vm3");
+    sender.invoke(() -> createSecuredCache(senderSecurityProps, null, lnPort));
+    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, null, nyPort));
 
-    vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
-    logger.info("Created sender in vm2");
+    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));
+    receiver.invoke(() -> createReceiverInSecuredCache());
 
-    vm3.invoke(() -> createReceiverInSecuredCache());
-    logger.info("Created receiver in vm3");
+    sender.invoke(
+        () -> createReplicatedRegion(regionName, "ln", isOffHeap()));
+    receiver.invoke(
+        () -> createReplicatedRegion(regionName, null, isOffHeap()));
 
-    vm2.invoke(
-        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
-    logger.info("Created RR in vm2");
-    vm3.invoke(
-        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
-    logger.info("Created RR in vm3");
+    sender.invoke(() -> startSender("ln"));
+    sender.invoke(() -> waitForSenderRunningState("ln"));
+    sender.invoke(() -> doPuts(regionName, 1));
 
-    vm2.invoke(() -> WANTestBase.startSender("ln"));
-    vm2.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
-    vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
-    vm3.invoke(() -> {
-      Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
+    receiver.invoke(() -> {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
       await().untilAsserted(() -> assertTrue(r.size() > 0));
-
     });
-    logger.info("Done successfully.");
   }
 
   /**
@@ -175,66 +169,49 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
    */
   @Test
   public void testWanAuthInvalidCredentials() {
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    logger.info("Created locator on local site");
-
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-    logger.info("Created locator on remote site");
-
-
-    CredentialGenerator gen = new DummyCredentialGenerator();
-    logger.info("Picked up credential: " + gen);
+    final CredentialGenerator credentialGenerator = new DummyCredentialGenerator();
 
-    Properties extraProps = gen.getSystemProperties();
+    final Properties extraProps = credentialGenerator.getSystemProperties();
 
-    String clientauthenticator = gen.getAuthenticator();
-    String clientauthInit = gen.getAuthInit();
-
-    Properties credentials1 = gen.getInvalidCredentials(1);
+    final Properties senderCredentials = credentialGenerator.getInvalidCredentials(1);
     if (extraProps != null) {
-      credentials1.putAll(extraProps);
+      senderCredentials.putAll(extraProps);
     }
-    Properties javaProps1 = gen.getJavaProperties();
-    Properties credentials2 = gen.getInvalidCredentials(2);
+    final Properties senderJavaProperties = credentialGenerator.getJavaProperties();
+
+    final Properties receiverCredentials = credentialGenerator.getInvalidCredentials(2);
     if (extraProps != null) {
-      credentials2.putAll(extraProps);
+      receiverCredentials.putAll(extraProps);
     }
-    Properties javaProps2 = gen.getJavaProperties();
-
-    Properties props1 =
-        buildProperties(clientauthenticator, clientauthInit, null, credentials1, null);
-    Properties props2 =
-        buildProperties(clientauthenticator, clientauthInit, null, credentials2, null);
-
-    logger.info("Done building auth properties");
+    final Properties receiverJavaProps = credentialGenerator.getJavaProperties();
 
-    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props1, javaProps1, lnPort));
-    logger.info("Created secured cache in vm2");
+    final String clientAuthenticator = credentialGenerator.getAuthenticator();
+    final String clientAuthInit = credentialGenerator.getAuthInit();
 
-    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, javaProps2, nyPort));
-    logger.info("Created secured cache in vm3");
+    final Properties senderSecurityProps =
+        buildProperties(clientAuthenticator, clientAuthInit, null, senderCredentials, null);
+    final Properties receiverSecurityPropsWithIncorrectSenderCreds =
+        buildProperties(clientAuthenticator, clientAuthInit, null, receiverCredentials, null);
 
-    vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
-    logger.info("Created sender in vm2");
+    // ------------------------------ Set Up With Invalid Credentials ------------------------------
 
-    vm3.invoke(() -> createReceiverInSecuredCache());
-    logger.info("Created receiver in vm3");
-
-    vm2.invoke(
-        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap()));
-    logger.info("Created RR in vm2");
-    vm3.invoke(
-        () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap()));
-    logger.info("Created RR in vm3");
+    sender.invoke(() -> {
+      createSecuredCache(senderSecurityProps, senderJavaProperties, lnPort);
+      createReplicatedRegion(regionName, senderId, isOffHeap());
+      createSender(senderId, 2, false, 100, 10, false, false, null, true);
+    });
 
-    // Start sender
-    vm2.invoke(() -> WANTestBase.startSender("ln"));
+    receiver.invoke(() -> {
+      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithIncorrectSenderCreds,
+          receiverJavaProps);
+    });
 
-    // Verify the sender is started
-    vm2.invoke(() -> verifySenderRunningState("ln"));
+    sender.invoke(() -> {
+      startSender(senderId);
+      doPutsAndVerifyQueue(regionName, numPuts, false, numPuts);
+    });
 
-    // Verify the sender is not connected
-    vm2.invoke(() -> verifySenderConnectedState("ln", false));
+    receiver.invoke(() -> validateRegionSize(regionName, 0));
   }
 
   /**
@@ -243,79 +220,220 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
    * defect 44650.
    */
   @Test
-  public void testWanSecurityManagerWithInvalidCredentials() {
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    logger.info("Created locator on local site");
+  public void testWanSecurityManagerWithInvalidThenValidCredentials() {
+    final Properties senderSecurityProps = buildSecurityProperties("admin", "wrongPswd");
+
+    final String securityJsonResource =
+        "org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json";
+    final Properties receiverSecurityPropsWithCorrectSenderCreds =
+        buildSecurityProperties(securityJsonResource);
+    final Properties receiverSecurityPropsWithIncorrectSenderCreds = buildSecurityProperties();
+
+    // ------------------------------ Set Up With Invalid Credentials ------------------------------
+
+    sender.invoke(() -> {
+      createSecuredCache(senderSecurityProps, null, lnPort);
+      createReplicatedRegion(regionName, senderId, isOffHeap());
+      createSender(senderId, 2, false, 100, 10, false, false, null, true);
+    });
+
+    receiver.invoke(() -> {
+      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithIncorrectSenderCreds,
+          null);
+    });
+
+    sender.invoke(() -> {
+      startSender(senderId);
+      doPutsAndVerifyQueue(regionName, numPuts, false, numPuts);
+    });
+
+    receiver.invoke(() -> validateRegionSize(regionName, 0));
+
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
+
+    receiver.invoke(() -> {
+      closeCache();
+      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithCorrectSenderCreds, null);
+    });
+
+    sender.invoke(() -> {
+      doPutsAndVerifyQueue(regionName, numPuts, true, 0);
+    });
+
+    receiver.invoke(() -> validateRegionSize(regionName, numPuts));
+  }
+
+  @Test
+  public void testWanSecurityManagerWithValidThenInvalidThenValidCredentials() {
+    final String securityJsonResource =
+        "org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json";
+    final String gatewayConnectionRetryIntervalConfigParameter =
+        DistributionConfig.GEMFIRE_PREFIX + "gateway-connection-retry-interval";
+
+    final Properties senderSecurityProps = buildSecurityProperties("admin", "wrongPswd");
+
+    final Properties receiverSecurityPropsWithCorrectSenderCreds =
+        buildSecurityProperties(securityJsonResource);
+    final Properties receiverSecurityPropsWithIncorrectSenderCreds = buildSecurityProperties();
+
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
+
+    sender.invoke(() -> {
+      createSecuredCache(senderSecurityProps, null, lnPort);
+      createReplicatedRegion(regionName, senderId, isOffHeap());
+      try {
+        /*
+         * Setting the gateway-connection-retry-interval to 0 ensures that the Dispatcher and
+         * AckReader eagerly retry connections, so we may detect any issues with the retry logic
+         * early.
+         */
+        System.setProperty(gatewayConnectionRetryIntervalConfigParameter, "0");
+        createSender(senderId, 2, false, 100, 10, false, false, null, true);
+      } finally {
+        System.clearProperty(gatewayConnectionRetryIntervalConfigParameter);
+      }
+    });
+
+    receiver.invoke(() -> {
+      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithCorrectSenderCreds, null);
+    });
+
+    sender.invoke(() -> {
+      startSender(senderId);
+      doPutsAndVerifyQueue(regionName, numPuts, true, 0);
+    });
+
+    receiver.invoke(() -> validateRegionSize(regionName, numPuts));
+
+    // ------------------------------ Set Up With Invalid Credentials ------------------------------
+
+    receiver.invoke(() -> {
+      // Simulate restarting the receiver, this time without valid credentials for the sender.
+      closeCache();
+      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithIncorrectSenderCreds,
+          null);
+    });
+
+    sender.invoke(() -> {
+      doPutsAndVerifyQueue(regionName, numPuts, false, numPuts);
+    });
+
+    receiver.invoke(() -> validateRegionSize(regionName, 0));
+
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
+
+    receiver.invoke(() -> {
+      closeCache();
+      // Simulate restarting the receiver, and restore valid credentials for the sender.
+      createSecuredReceiver(nyPort, regionName, receiverSecurityPropsWithCorrectSenderCreds, null);
+    });
+
+    sender.invoke(() -> {
+      // Data should be able to flow properly after valid credentials have been restored.
+      doPutsAndVerifyQueue(regionName, numPuts, true, 0);
+    });
+
+    receiver.invoke(() -> validateRegionSize(regionName, numPuts));
+  }
+
+  @Test
+  public void testWanAuthValidCredentialsWithServer() {
+    final DummyCredentialGenerator credentialGenerator = new DummyCredentialGenerator();
+    credentialGenerator.init();
+
+    final Properties extraProps = credentialGenerator.getSystemProperties();
+
+    final Properties senderCredentials = credentialGenerator.getValidCredentials(1);
+    if (extraProps != null) {
+      senderCredentials.putAll(extraProps);
+    }
+    final Properties senderJavaProps = credentialGenerator.getJavaProperties();
+
+    final Properties receiverCredentials = credentialGenerator.getInvalidCredentials(2);
+    if (extraProps != null) {
+      receiverCredentials.putAll(extraProps);
+    }
+    final Properties receiverJavaProps = credentialGenerator.getJavaProperties();
 
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-    logger.info("Created locator on remote site");
+    final String clientAuthenticator = credentialGenerator.getAuthenticator();
+    final String clientAuthInit = UserPasswdAI.class.getName() + ".createAI";
 
-    Properties props1 = buildSecurityProperties("admin", "wrongPswd");
-    Properties props2 = buildSecurityProperties("guest", "wrongPswd");
+    final Properties senderSecurityProps =
+        buildProperties(clientAuthenticator, clientAuthInit, null, senderCredentials, null);
+    final Properties receiverSecurityProps =
+        buildProperties(clientAuthenticator, clientAuthInit, null, receiverCredentials, null);
 
-    logger.info("Done building auth properties");
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
 
-    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props1, null, lnPort));
-    logger.info("Created secured cache in vm2");
+    sender.invoke(() -> createSecuredCache(senderSecurityProps, senderJavaProps, lnPort));
 
-    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, null, nyPort));
-    logger.info("Created secured cache in vm3");
+    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, receiverJavaProps, nyPort));
 
-    String senderId = "ln";
-    vm2.invoke(
-        () -> WANTestBase.createSender(senderId, 2, false, 100, 10, false, false, null, true));
-    logger.info("Created sender in vm2");
+    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));
 
-    vm3.invoke(() -> createReceiverInSecuredCache());
-    logger.info("Created receiver in vm3");
+    receiver.invoke(() -> createReceiverInSecuredCache());
 
-    String regionName = getTestMethodName() + "_RR";
-    vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, senderId, isOffHeap()));
-    logger.info("Created RR in vm2");
-    vm3.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap()));
-    logger.info("Created RR in vm3");
+    sender.invoke(() -> {
+      startSender("ln");
+      waitForSenderRunningState("ln");
+      verifyDifferentServerInGetCredentialCall();
+    });
+
+    receiver.invoke(() -> verifyDifferentServerInGetCredentialCall());
+  }
 
-    // Start sender
-    vm2.invoke(() -> WANTestBase.startSender(senderId));
+  @Test
+  public void testWanSecurityManagerAuthValidCredentialsWithServer() {
+    Properties senderSecurityProps = buildSecurityProperties("admin", "secret");
+    Properties receiverSecurityProps = buildSecurityProperties("guest", "guest");
 
-    // Verify the sender is started
-    vm2.invoke(() -> verifySenderRunningState(senderId));
+    // ------------------------------- Set Up With Valid Credentials -------------------------------
 
-    // Verify the sender is not connected
-    vm2.invoke(() -> verifySenderConnectedState(senderId, false));
+    sender.invoke(() -> createSecuredCache(senderSecurityProps, null, lnPort));
 
-    // Do some puts in the sender
-    int numPuts = 10;
-    vm2.invoke(() -> WANTestBase.doPuts(regionName, numPuts));
+    receiver.invoke(() -> createSecuredCache(receiverSecurityProps, null, nyPort));
 
-    // Verify the sender is still started
-    vm2.invoke(() -> verifySenderRunningState(senderId));
+    sender.invoke(() -> createSender("ln", 2, false, 100, 10, false, false, null, true));
 
-    // Verify the sender is still not connected
-    vm2.invoke(() -> verifySenderConnectedState(senderId, false));
+    receiver.invoke(() -> createReceiverInSecuredCache());
 
-    // Verify the sender queue size
-    vm2.invoke(() -> testQueueSize(senderId, numPuts));
+    sender.invoke(() -> {
+      startSender("ln");
+      waitForSenderRunningState("ln");
+      verifyDifferentServerInGetCredentialCall();
+    });
 
-    // Stop the receiver
-    vm3.invoke(() -> closeCache());
+    // this would fail for now because for integrated security, we are not sending the receiver's
+    // credentials back
+    // to the sender. Because in the old security implementation, even though the receiver's
+    // credentials are sent back to the sender
+    // the sender is not checking it.
+    // sender.invoke(() -> verifyDifferentServerInGetCredentialCall());
+  }
 
-    // Restart the receiver with a SecurityManager that accepts the existing sender's username and
-    // password. The
-    // NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json.
-    // file contains the admin user definition that the SecurityManager will accept.
-    String securityJsonRersource = "org/apache/geode/internal/cache/wan/misc/"
-        + getClass().getSimpleName() + "." + getTestMethodName() + ".security.json";
-    Properties propsRestart = buildSecurityProperties("guest", "guest", securityJsonRersource);
-    vm3.invoke(() -> createSecuredCache(propsRestart, null, nyPort));
-    vm3.invoke(() -> createReplicatedRegion(regionName, null, isOffHeap()));
-    vm3.invoke(() -> createReceiverInSecuredCache());
+  private void doPutsAndVerifyQueue(final String regionName, final int numPuts,
+      final boolean shouldBeConnected,
+      final int expectedQueueSize) {
+    doPuts(regionName, numPuts);
+    verifyRunningWithConnectedState(senderId, shouldBeConnected);
+    checkQueueSize(senderId, expectedQueueSize);
+  }
 
-    // Wait for the queue to drain
-    vm2.invoke(() -> checkQueueSize(senderId, 0));
+  private void createSecuredReceiver(Integer nyPort, String regionName,
+      Properties receiverSecurityPropsWithCorrectSenderCreds,
+      Object javaProps) {
+    createSecuredCache(receiverSecurityPropsWithCorrectSenderCreds, javaProps, nyPort);
+    createReplicatedRegion(regionName, null, isOffHeap());
+    createReceiverInSecuredCache();
+  }
 
-    // Verify region size on receiver
-    vm3.invoke(() -> validateRegionSize(regionName, numPuts));
+  private void verifyRunningWithConnectedState(
+      final String senderId,
+      final boolean shouldBeConnected) {
+    await().untilAsserted(() -> {
+      verifySenderRunningState(senderId);
+      verifySenderConnectedState(senderId, shouldBeConnected);
+    });
   }
 
   private static Properties buildProperties(String clientauthenticator, String clientAuthInit,
@@ -339,23 +457,29 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     return authProps;
   }
 
-  private static Properties buildSecurityProperties(String username, String password) {
-    return buildSecurityProperties(username, password,
-        "org/apache/geode/security/templates/security.json");
+  private static Properties buildSecurityProperties(final String username, final String password) {
+    final Properties props = buildSecurityProperties();
+    props.put("security-username", username);
+    props.put("security-password", password);
+    return props;
   }
 
-  private static Properties buildSecurityProperties(String username, String password,
-      String securityJsonResource) {
-    Properties props = new Properties();
-    props.put(SECURITY_MANAGER, TestSecurityManager.class.getName());
+  private static Properties buildSecurityProperties(
+      final String securityJsonResource) {
+    final Properties props = buildSecurityProperties();
     props.put("security-json", securityJsonResource);
+    return props;
+  }
+
+  private static Properties buildSecurityProperties() {
+    final Properties props = new Properties();
+    props.put(SECURITY_MANAGER, TestSecurityManager.class.getName());
     props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName());
-    props.put("security-username", username);
-    props.put("security-password", password);
+    props.put("security-json", securityJsonResource);
     return props;
   }
 
-  public static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) {
+  private static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) {
     authProps.setProperty(MCAST_PORT, "0");
     authProps.setProperty(LOCATORS, "localhost[" + locPort + "]");
 
@@ -397,104 +521,9 @@ public class NewWanAuthenticationDUnitTest extends WANTestBase {
     }
   }
 
-  public static void verifyDifferentServerInGetCredentialCall() {
+  private static void verifyDifferentServerInGetCredentialCall() {
     Assert.assertTrue(isDifferentServerInGetCredentialCall,
         "verifyDifferentServerInGetCredentialCall: Server should be different");
     isDifferentServerInGetCredentialCall = false;
   }
-
-  @Test
-  public void testWanAuthValidCredentialsWithServer() {
-    disconnectAllFromDS();
-    {
-      Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-      logger.info("Created locator on local site");
-
-      Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-      logger.info("Created locator on remote site");
-
-      DummyCredentialGenerator gen = new DummyCredentialGenerator();
-      gen.init();
-      Properties extraProps = gen.getSystemProperties();
-
-      String clientauthenticator = gen.getAuthenticator();
-      String clientauthInit = UserPasswdAI.class.getName() + ".createAI";
-
-      Properties credentials1 = gen.getValidCredentials(1);
-      if (extraProps != null) {
-        credentials1.putAll(extraProps);
-      }
-      Properties javaProps1 = gen.getJavaProperties();
-
-      Properties credentials2 = gen.getInvalidCredentials(2);
-      if (extraProps != null) {
-        credentials2.putAll(extraProps);
-      }
-      Properties javaProps2 = gen.getJavaProperties();
-
-      Properties props1 =
-          buildProperties(clientauthenticator, clientauthInit, null, credentials1, null);
-      Properties props2 =
-          buildProperties(clientauthenticator, clientauthInit, null, credentials2, null);
-
-      vm2.invoke(
-          () -> NewWanAuthenticationDUnitTest.createSecuredCache(props1, javaProps1, lnPort));
-      logger.info("Created secured cache in vm2");
-
-      vm3.invoke(
-          () -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, javaProps2, nyPort));
-      logger.info("Created secured cache in vm3");
-
-      vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
-      logger.info("Created sender in vm2");
-
-      vm3.invoke(() -> createReceiverInSecuredCache());
-      logger.info("Created receiver in vm3");
-
-      vm2.invoke(() -> WANTestBase.startSender("ln"));
-      vm2.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
-
-      vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
-      vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
-    }
-  }
-
-  @Test
-  public void testWanSecurityManagerAuthValidCredentialsWithServer() {
-    disconnectAllFromDS();
-    {
-      Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-      logger.info("Created locator on local site");
-
-      Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-      logger.info("Created locator on remote site");
-
-      Properties props1 = buildSecurityProperties("admin", "secret");
-      Properties props2 = buildSecurityProperties("guest", "guest");
-
-      vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props1, null, lnPort));
-      logger.info("Created secured cache in vm2");
-
-      vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, null, nyPort));
-      logger.info("Created secured cache in vm3");
-
-      vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true));
-      logger.info("Created sender in vm2");
-
-      vm3.invoke(() -> createReceiverInSecuredCache());
-      logger.info("Created receiver in vm3");
-
-      vm2.invoke(() -> WANTestBase.startSender("ln"));
-      vm2.invoke(() -> WANTestBase.waitForSenderRunningState("ln"));
-
-      vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
-
-      // this would fail for now because for integrated security, we are not sending the receiver's
-      // credentials back
-      // to the sender. Because in the old security implementation, even though the receiver's
-      // credentials are sent back to the sender
-      // the sender is not checking it.
-      // vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
-    }
-  }
 }
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index b8ae5c9..6951e32 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -65,6 +65,8 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    */
   private int failedConnectCount = 0;
 
+  private static final int RETRY_WAIT_TIME = 100;
+
   void setAckReaderThread(AckReaderThread ackReaderThread) {
     this.ackReaderThread = ackReaderThread;
   }
@@ -122,24 +124,10 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       }
       if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
         // if our pool is shutdown then just be silent
-      } else if (ex instanceof IOException
-          || (ex instanceof ServerConnectivityException
-              && !(ex.getCause() instanceof PdxRegistryMismatchException))
-          || ex instanceof ConnectionDestroyedException) {
-        // If the cause is an IOException or a ServerException, sleep and retry.
-        // Sleep for a bit and recheck.
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
+      } else if (RecoverableExceptionPredicates.isRecoverableWhenReadingAck(ex)) {
+        sleepBeforeRetry();
       } else {
-        if (!(ex instanceof CancelException)) {
-          logger.fatal(
-              "Stopping the processor because the following exception occurred while processing a batch:",
-              ex);
-        }
-        this.processor.setIsStopped(true);
+        logAndStopProcessor(ex);
       }
     }
     return ack;
@@ -156,43 +144,25 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
         statistics.endBatch(start, events.size());
       }
     } catch (GatewaySenderException ge) {
-
       Throwable t = ge.getCause();
       if (this.sender.getProxy() == null || this.sender.getProxy().isDestroyed()) {
         // if our pool is shutdown then just be silent
-      } else if (t instanceof IOException || t instanceof ServerConnectivityException
-          || t instanceof ConnectionDestroyedException || t instanceof IllegalStateException
-          || t instanceof GemFireSecurityException) {
+      } else if (RecoverableExceptionPredicates.isRecoverableWhenDispatchingBatch(t)) {
         this.processor.handleException();
-        // If the cause is an IOException or a ServerException, sleep and retry.
-        // Sleep for a bit and recheck.
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
+        sleepBeforeRetry();
         if (logger.isDebugEnabled()) {
-          logger.debug("Because of IOException, failed to dispatch a batch with id : {}",
-              this.processor.getBatchId());
+          logger.debug(
+              "Failed to dispatch a batch with id {} due to non-fatal exception {}.  Retrying in {} ms",
+              this.processor.getBatchId(), t, RETRY_WAIT_TIME);
         }
       } else {
-        logger.fatal(
-            "Stopping the processor because the following exception occurred while processing a batch:",
-            ge);
-        this.processor.setIsStopped(true);
+        logAndStopProcessor(ge);
       }
     } catch (CancelException e) {
-      if (logger.isDebugEnabled()) {
-        logger
-            .debug("Stopping the processor because cancellation occurred while processing a batch");
-      }
-      this.processor.setIsStopped(true);
+      logAndStopProcessor(e);
       throw e;
     } catch (Exception e) {
-      this.processor.setIsStopped(true);
-      logger.fatal(
-          "Stopping the processor because the following exception occurred while processing a batch:",
-          e);
+      logAndStopProcessor(e);
     }
     return success;
   }
@@ -826,4 +796,84 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       destroyConnection();
     }
   }
+
+  private void sleepBeforeRetry() {
+    try {
+      Thread.sleep(RETRY_WAIT_TIME);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void logAndStopProcessor(final Exception ex) {
+    if (ex instanceof CancelException) {
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("Stopping the processor because cancellation occurred while processing a batch");
+      }
+    } else {
+      logger.fatal(
+          "Stopping the processor because the following exception occurred while processing a batch:",
+          ex);
+    }
+    this.processor.setIsStopped(true);
+  }
+
+  private static class RecoverableExceptionPredicates {
+
+    /**
+     * Certain exception types are considered recoverable when reading an acknowledgement.
+     */
+    static boolean isRecoverableWhenReadingAck(final Exception ex) {
+      /**
+       * It is considered non-recoverable if the PDX registry files are deleted from the sending
+       * side of a WAN Gateway. This is determined by checking if the cause of the
+       * {@link ServerConnectivityException} is caused by a {@link PdxRegistryMismatchException}
+       */
+      return isRecoverableInAllCases(ex)
+          || (ex instanceof ServerConnectivityException
+              && !(ex.getCause() instanceof PdxRegistryMismatchException));
+    }
+
+    /**
+     * Certain exception types are considered recoverable when dispatching a batch.
+     */
+    static boolean isRecoverableWhenDispatchingBatch(final Throwable t) {
+      /**
+       * We consider {@link ServerConnectivityException} to be a temporary connectivity issue and
+       * is therefore recoverable. The {@link IllegalStateException} can occur if off-heap is used,
+       * and a GatewaySenderEventImpl is serialized after being freed. This can happen if the
+       * region is destroyed concurrently while the gateway sender event is being processed.
+       */
+      return isRecoverableInAllCases(t)
+          || t instanceof ServerConnectivityException
+          || t instanceof IllegalStateException;
+    }
+
+    /**
+     * Certain exception types are considered recoverable when either dispatching a batch or
+     * reading an acknowledgement.
+     */
+    private static boolean isRecoverableInAllCases(final Throwable t) {
+      /**
+       * {@link IOException} and {@link ConnectionDestroyedException} can occur
+       * due to temporary network issues and therefore are recoverable.
+       * {@link GemFireSecurityException} represents an inability to authenticate with the
+       * gateway receiver.
+       *
+       * By treating {@link GemFireSecurityException} as recoverable we are continuing to retry
+       * in a couple situations:
+       *
+       * <ul>
+       * <li>The implementation of the {@link SecurityManager} loses connectivity to the actual
+       * authentication authority e.g. Active Directory</li> (expecting that connectivity will
+       * later be restored)
+       * <li>Credentials are invalid (expecting that they will later become valid)</li>
+       * </ul>
+       */
+      return t instanceof IOException
+          || t instanceof ConnectionDestroyedException
+          || t instanceof GemFireSecurityException;
+    }
+  }
 }