You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by do...@apache.org on 2021/02/04 22:56:57 UTC

[geode] branch support/1.12 updated: GEODE-8890: Catch ClassCastException in LocatorDiscovery.java (#6000)

This is an automated email from the ASF dual-hosted git repository.

donalevans pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 0c735c5  GEODE-8890: Catch ClassCastException in LocatorDiscovery.java (#6000)
0c735c5 is described below

commit 0c735c5340fe8df54705fa346c75b476af452b93
Author: Donal Evans <do...@pivotal.io>
AuthorDate: Thu Feb 4 13:38:03 2021 -0800

    GEODE-8890: Catch ClassCastException in LocatorDiscovery.java (#6000)
    
    - Handle ClassCastException the same way as ClassNotFoundException
    - Added unit tests for LocatorDiscovery
    - Modify log messages to be clearer
    - Clean up LocatorDiscovery static analyzer warnings
    
    Authored-by: Donal Evans <do...@vmware.com>
    (cherry picked from commit f4423bb0649ca1d7c4b38bb4915ad69fe461dd01)
---
 .../internal/locator/wan/LocatorDiscovery.java     | 137 ++++----
 .../internal/locator/wan/LocatorDiscoveryTest.java | 353 +++++++++++++++++++++
 2 files changed, 431 insertions(+), 59 deletions(-)

diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
index ce1c7e2..3c3dedf 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
@@ -15,11 +15,14 @@
 package org.apache.geode.cache.client.internal.locator.wan;
 
 
+import static org.apache.geode.distributed.internal.WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT;
+
 import java.io.IOException;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.WanLocatorDiscoverer;
 import org.apache.geode.distributed.internal.tcpserver.TcpClient;
 import org.apache.geode.internal.InternalDataSerializer;
@@ -31,8 +34,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.util.internal.GeodeGlossary;
 
 /**
- * This class represent a runnable task which exchange the locator information with local
- * locators(within the site) as well as remote locators (across the site)
+ * This class represents a runnable task which exchanges the locator information with local
+ * locators (within the site) as well as remote locators (across the site)
  *
  * @since GemFire 7.0
  */
@@ -40,24 +43,28 @@ public class LocatorDiscovery {
 
   private static final Logger logger = LogService.getLogger();
 
-  private WanLocatorDiscoverer discoverer;
+  private final WanLocatorDiscoverer discoverer;
 
-  private DistributionLocatorId locatorId;
+  private final DistributionLocatorId locatorId;
 
-  private LocatorMembershipListener locatorListener;
+  private final LocatorMembershipListener locatorListener;
 
   RemoteLocatorJoinRequest request;
 
   TcpClient locatorClient;
 
   public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT =
-      Integer.getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue();
+      Integer.getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000);
 
   public static final int WAN_LOCATOR_CONNECTION_INTERVAL =
-      Integer.getInteger("WANLocator.CONNECTION_INTERVAL", 10000).intValue();
+      Integer.getInteger("WANLocator.CONNECTION_INTERVAL", 10000);
 
   public static final int WAN_LOCATOR_PING_INTERVAL =
-      Integer.getInteger("WANLocator.PING_INTERVAL", 10000).intValue();
+      Integer.getInteger("WANLocator.PING_INTERVAL", 10000);
+
+  // For testing. When true, Thread.sleep() is not called in exchangeLocalLocators() or
+  // exchangeRemoteLocators()
+  private final boolean skipWaiting;
 
   public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,
       RemoteLocatorJoinRequest request, LocatorMembershipListener locatorListener) {
@@ -69,6 +76,20 @@ public class LocatorDiscovery {
         .getSocketCreatorForComponent(SecurableCommunicationChannel.LOCATOR),
         InternalDataSerializer.getDSFIDSerializer().getObjectSerializer(),
         InternalDataSerializer.getDSFIDSerializer().getObjectDeserializer());
+    this.skipWaiting = false;
+  }
+
+  // Test constructor
+  @VisibleForTesting
+  LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,
+      RemoteLocatorJoinRequest request, LocatorMembershipListener locatorListener,
+      TcpClient locatorClient) {
+    this.discoverer = discoverer;
+    this.locatorId = locator;
+    this.request = request;
+    this.locatorListener = locatorListener;
+    this.locatorClient = locatorClient;
+    this.skipWaiting = true;
   }
 
   /**
@@ -76,7 +97,7 @@ public class LocatorDiscovery {
    * swamp the logs in retries due to same batch failures.
    */
   private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval =
-      new ConcurrentHashMap<DistributionLocatorId, long[]>();
+      new ConcurrentHashMap<>();
 
   /**
    * The maximum size of {@link #failureLogInterval} beyond which it will start logging all failure
@@ -93,10 +114,10 @@ public class LocatorDiscovery {
 
   public boolean skipFailureLogging(DistributionLocatorId locatorId) {
     boolean skipLogging = false;
-    if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
-      long[] logInterval = this.failureLogInterval.get(locatorId);
+    if (failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
+      long[] logInterval = failureLogInterval.get(locatorId);
       if (logInterval == null) {
-        logInterval = this.failureLogInterval.putIfAbsent(locatorId,
+        logInterval = failureLogInterval.putIfAbsent(locatorId,
             new long[] {System.currentTimeMillis(), 1000});
       }
       if (logInterval != null) {
@@ -129,21 +150,17 @@ public class LocatorDiscovery {
     }
   }
 
-  private WanLocatorDiscoverer getDiscoverer() {
-    return this.discoverer;
-  }
-
   private void exchangeLocalLocators() {
     int retryAttempt = 1;
-    while (!getDiscoverer().isStopped()) {
+    while (!discoverer.isStopped()) {
       try {
-        RemoteLocatorJoinResponse response =
-            (RemoteLocatorJoinResponse) locatorClient.requestToServer(locatorId.getHost(), request,
-                WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
+        RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse) locatorClient
+            .requestToServer(locatorId.getHost(), request, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
         if (response != null) {
-          LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener);
-          logger.info("Locator discovery task exchanged locator information {} with {}: {}.",
-              new Object[] {request.getLocator(), locatorId, response.getLocators()});
+          addExchangedLocators(response);
+          logger.info(
+              "Locator discovery task for locator {} exchanged locator information with {}: {}.",
+              request.getLocator(), locatorId, response.getLocators());
           break;
         }
       } catch (IOException ioe) {
@@ -151,28 +168,26 @@ public class LocatorDiscovery {
           ConnectionException coe =
               new ConnectionException("Not able to connect to local locator after "
                   + WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts", ioe);
-          logger.fatal(String.format(
-              "Locator discovery task could not exchange locator information %s with %s after %s retry attempts.",
-              new Object[] {request.getLocator(), locatorId, retryAttempt}),
-              coe);
+          logger.fatal(
+              "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts.",
+              request.getLocator(), locatorId, retryAttempt, coe);
           break;
         }
         if (skipFailureLogging(locatorId)) {
           logger.warn(
-              "Locator discovery task could not exchange locator information {} with {} after {} retry attempts. Retrying in {} ms.",
-              new Object[] {request.getLocator(), locatorId, retryAttempt,
-                  WAN_LOCATOR_CONNECTION_INTERVAL});
+              "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts. Retrying in {} ms.",
+              request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL);
         }
         try {
-          Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+          if (!skipWaiting) {
+            Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+          }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
         retryAttempt++;
-        continue;
-      } catch (ClassNotFoundException classNotFoundException) {
-        logger.fatal("Locator discovery task encountred unexpected exception",
-            classNotFoundException);
+      } catch (ClassNotFoundException | ClassCastException ex) {
+        logger.fatal("Locator discovery task encountered unexpected exception", ex);
         break;
       }
     }
@@ -180,23 +195,23 @@ public class LocatorDiscovery {
 
   public void exchangeRemoteLocators() {
     int retryAttempt = 1;
-    DistributionLocatorId remoteLocator = this.locatorId;
-    while (!getDiscoverer().isStopped()) {
-      RemoteLocatorJoinResponse response;
+    while (!discoverer.isStopped()) {
       try {
-        response =
-            (RemoteLocatorJoinResponse) locatorClient.requestToServer(remoteLocator.getHost(),
-                request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
+        RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse) locatorClient
+            .requestToServer(locatorId.getHost(), request, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
         if (response != null) {
-          LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener);
-          logger.info("Locator discovery task exchanged locator information {} with {}: {}.",
-              new Object[] {request.getLocator(), locatorId, response.getLocators()});
+          addExchangedLocators(response);
+          logger.info(
+              "Locator discovery task for locator {} exchanged locator information with {}: {}.",
+              request.getLocator(), locatorId, response.getLocators());
           RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest("");
           while (true) {
-            Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
+            if (!skipWaiting) {
+              Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
+            }
             RemoteLocatorPingResponse pingResponse =
-                (RemoteLocatorPingResponse) locatorClient.requestToServer(remoteLocator.getHost(),
-                    pingRequest, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
+                (RemoteLocatorPingResponse) locatorClient.requestToServer(locatorId.getHost(),
+                    pingRequest, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
             if (pingResponse != null) {
               continue;
             }
@@ -205,28 +220,27 @@ public class LocatorDiscovery {
         }
       } catch (IOException ioe) {
         if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
-          logger.fatal(String.format(
-              "Locator discovery task could not exchange locator information %s with %s after %s retry attempts.",
-              new Object[] {request.getLocator(), remoteLocator, retryAttempt}),
-              ioe);
+          logger.fatal(
+              "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts.",
+              request.getLocator(), locatorId, retryAttempt, ioe);
           break;
         }
-        if (skipFailureLogging(remoteLocator)) {
+        if (skipFailureLogging(locatorId)) {
           logger.warn(
-              "Locator discovery task could not exchange locator information {} with {} after {} retry attempts. Retrying in {} ms.",
-              new Object[] {request.getLocator(), remoteLocator, retryAttempt,
+              "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts. Retrying in {} ms.",
+              new Object[] {request.getLocator(), locatorId, retryAttempt,
                   WAN_LOCATOR_CONNECTION_INTERVAL});
         }
         try {
-          Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+          if (!skipWaiting) {
+            Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+          }
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
         retryAttempt++;
-        continue;
-      } catch (ClassNotFoundException classNotFoundException) {
-        logger.fatal("Locator discovery task encountred unexpected exception",
-            classNotFoundException);
+      } catch (ClassNotFoundException | ClassCastException ex) {
+        logger.fatal("Locator discovery task encountered unexpected exception", ex);
         break;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -234,4 +248,9 @@ public class LocatorDiscovery {
     }
   }
 
+  @VisibleForTesting
+  void addExchangedLocators(RemoteLocatorJoinResponse response) {
+    LocatorHelper.addExchangedLocators(response.getLocators(), locatorListener);
+  }
+
 }
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscoveryTest.java b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscoveryTest.java
new file mode 100644
index 0000000..f3e9133
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscoveryTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal.locator.wan;
+
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.WanLocatorDiscoverer;
+import org.apache.geode.distributed.internal.tcpserver.TcpClient;
+import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+
+public class LocatorDiscoveryTest {
+  private DistributionLocatorId locatorId;
+  private RemoteLocatorJoinRequest request;
+  private LocatorDiscovery locatorDiscovery;
+  private TcpClient locatorClient;
+  private WanLocatorDiscoverer discoverer;
+  private LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery;
+  private LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery;
+
+  @Before
+  public void setUp() {
+    discoverer = mock(WanLocatorDiscoverer.class);
+    locatorId = mock(DistributionLocatorId.class);
+    request = mock(RemoteLocatorJoinRequest.class);
+    LocatorMembershipListener listener = mock(LocatorMembershipListener.class);
+    locatorClient = mock(TcpClient.class);
+    locatorDiscovery =
+        spy(new LocatorDiscovery(discoverer, locatorId, request, listener, locatorClient));
+    localLocatorDiscovery = locatorDiscovery.new LocalLocatorDiscovery();
+    remoteLocatorDiscovery = locatorDiscovery.new RemoteLocatorDiscovery();
+  }
+
+  @Test
+  public void skipFailureLoggingReturnsCorrectly() {
+    // First call should always be false
+    assertFalse(locatorDiscovery.skipFailureLogging(locatorId));
+    long firstReturnedFalse = System.currentTimeMillis();
+
+    // Next calls should only be false if more than 1000ms has passed since the last call that
+    // returned false
+    assertTrue(locatorDiscovery.skipFailureLogging(locatorId));
+    assertTrue(locatorDiscovery.skipFailureLogging(locatorId));
+    await().until(() -> System.currentTimeMillis() - firstReturnedFalse > 1000);
+    assertFalse(locatorDiscovery.skipFailureLogging(locatorId));
+    long secondReturnedFalse = System.currentTimeMillis();
+
+    // Next calls should only be false if more than 2000ms has passed since the last call that
+    // returned false
+    assertTrue(locatorDiscovery.skipFailureLogging(locatorId));
+    await().until(() -> System.currentTimeMillis() - secondReturnedFalse > 2000);
+    assertFalse(locatorDiscovery.skipFailureLogging(locatorId));
+  }
+
+  @Test
+  public void localLocatorDiscoveryDoesNothingWhenDiscovererIsStopped() {
+    when(discoverer.isStopped()).thenReturn(true);
+    localLocatorDiscovery.run();
+    verifyZeroInteractions(locatorClient);
+  }
+
+  @Test
+  public void localLocatorDiscoveryStopsWithNonNullResponse()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    RemoteLocatorJoinResponse response = mock(RemoteLocatorJoinResponse.class);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(response);
+    doNothing().when(locatorDiscovery).addExchangedLocators(response);
+
+    localLocatorDiscovery.run();
+
+    // Confirm that we stopped after the first response was received
+    verify(locatorDiscovery, times(1)).addExchangedLocators(response);
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void localLocatorDiscoveryRetriesWithNullResponse()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(null);
+
+    localLocatorDiscovery.run();
+
+    // Confirm that we retried each time the response was null
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void localLocatorRetriesWhenIOException() throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenThrow(new IOException());
+
+    localLocatorDiscovery.run();
+
+    // Confirm that we retried after the first exception was thrown
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void localLocatorDoesNotRetryWhenClassNotFoundException()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenThrow(new ClassNotFoundException());
+
+    localLocatorDiscovery.run();
+
+    // Confirm that we did not retry after the first exception
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void localLocatorDoesNotRetryWhenClassCastException()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenThrow(new ClassCastException());
+
+    localLocatorDiscovery.run();
+
+    // Confirm that we did not retry after the first exception
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDiscoveryDoesNothingWhenDiscovererIsStopped() {
+    when(discoverer.isStopped()).thenReturn(true);
+    remoteLocatorDiscovery.run();
+    verifyZeroInteractions(locatorClient);
+  }
+
+  @Test
+  public void remoteLocatorDiscoveryPingsRemoteWhenJoinResponseIsNotNull()
+      throws IOException, ClassNotFoundException {
+    // Only allow one attempt before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(true);
+    RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(joinResponse);
+    doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+    // Return null to prevent the ping loop continuing forever
+    when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+        anyBoolean())).thenReturn(null);
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we sent a ping request after the first joinResponse was received
+    verify(locatorDiscovery, times(1)).addExchangedLocators(joinResponse);
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(1)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDiscoveryRetriesPingRemoteWhenJoinResponseIsNotNullAndPingResponseIsNotNull()
+      throws IOException, ClassNotFoundException {
+    // Only allow one attempt before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(true);
+    RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(joinResponse);
+    doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+    RemoteLocatorPingResponse pingResponse = mock(RemoteLocatorPingResponse.class);
+    // Return a non-null RemoteLocatorPingResponse, then return null to prevent the ping loop
+    // continuing forever
+    when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+        anyBoolean())).thenReturn(pingResponse).thenReturn(null);
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we retried pinging the remote locator
+    verify(locatorDiscovery, times(1)).addExchangedLocators(joinResponse);
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(2)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDiscoveryRetriesWithNullJoinResponse()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(null);
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we retried each time the response was null
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(0)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorRetriesWhenIOExceptionWhenSendingRemoteLocatorJoinRequest()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenThrow(new IOException());
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we retried after the first exception was thrown
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorRetriesWhenIOExceptionWhenSendingRemoteLocatorPingRequest()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(joinResponse);
+    doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+    when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+        anyBoolean())).thenThrow(new IOException());
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we retried after the first exception was thrown
+    verify(locatorDiscovery, times(2)).addExchangedLocators(any());
+    verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(2)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDoesNotRetryWhenClassNotFoundExceptionFromJoinRequest()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenThrow(new ClassNotFoundException());
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we did not retry after the first exception
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(0)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDoesNotRetryWhenClassNotFoundExceptionFromPingRequest()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(joinResponse);
+    doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+    when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+        anyBoolean())).thenThrow(new ClassNotFoundException());
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we did not retry after the first exception
+    verify(locatorDiscovery, times(1)).addExchangedLocators(any());
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(1)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDoesNotRetryWhenClassCastExceptionFromJoinRequest()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenThrow(new ClassCastException());
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we did not retry after the first exception
+    verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(0)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+
+  @Test
+  public void remoteLocatorDoesNotRetryWhenClassCastExceptionFromPingRequest()
+      throws IOException, ClassNotFoundException {
+    // Only allow two retries before stopping the locator discoverer
+    when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+    RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+    when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+        .thenReturn(joinResponse);
+    doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+    when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+        anyBoolean())).thenThrow(new ClassCastException());
+
+    remoteLocatorDiscovery.run();
+
+    // Confirm that we did not retry after the first exception
+    verify(locatorDiscovery, times(1)).addExchangedLocators(any());
+    verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+    verify(locatorClient, times(1)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+        anyInt(), anyBoolean());
+  }
+}