You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by do...@apache.org on 2021/02/04 22:55:51 UTC
[geode] branch support/1.13 updated: GEODE-8890: Catch
ClassCastException in LocatorDiscovery.java (#6000)
This is an automated email from the ASF dual-hosted git repository.
donalevans pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 27d0f5b GEODE-8890: Catch ClassCastException in LocatorDiscovery.java (#6000)
27d0f5b is described below
commit 27d0f5b3c97b7e118bb45797145d4ae924eff437
Author: Donal Evans <do...@pivotal.io>
AuthorDate: Thu Feb 4 13:38:03 2021 -0800
GEODE-8890: Catch ClassCastException in LocatorDiscovery.java (#6000)
- Handle ClassCastException the same way as ClassNotFoundException
- Added unit tests for LocatorDiscovery
- Modify log messages to be clearer
- Clean up LocatorDiscovery static analyzer warnings
Authored-by: Donal Evans <do...@vmware.com>
(cherry picked from commit f4423bb0649ca1d7c4b38bb4915ad69fe461dd01)
---
.../internal/locator/wan/LocatorDiscovery.java | 141 ++++----
.../internal/locator/wan/LocatorDiscoveryTest.java | 353 +++++++++++++++++++++
2 files changed, 432 insertions(+), 62 deletions(-)
diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
index 2be83f9..151134e 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscovery.java
@@ -15,11 +15,14 @@
package org.apache.geode.cache.client.internal.locator.wan;
+import static org.apache.geode.distributed.internal.WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT;
+
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.WanLocatorDiscoverer;
import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
@@ -33,8 +36,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.util.internal.GeodeGlossary;
/**
- * This class represent a runnable task which exchange the locator information with local
- * locators(within the site) as well as remote locators (across the site)
+ * This class represents a runnable task which exchanges the locator information with local
+ * locators (within the site) as well as remote locators (across the site)
*
* @since GemFire 7.0
*/
@@ -42,24 +45,28 @@ public class LocatorDiscovery {
private static final Logger logger = LogService.getLogger();
- private WanLocatorDiscoverer discoverer;
+ private final WanLocatorDiscoverer discoverer;
- private DistributionLocatorId locatorId;
+ private final DistributionLocatorId locatorId;
- private LocatorMembershipListener locatorListener;
+ private final LocatorMembershipListener locatorListener;
RemoteLocatorJoinRequest request;
TcpClient locatorClient;
public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT =
- Integer.getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue();
+ Integer.getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000);
public static final int WAN_LOCATOR_CONNECTION_INTERVAL =
- Integer.getInteger("WANLocator.CONNECTION_INTERVAL", 10000).intValue();
+ Integer.getInteger("WANLocator.CONNECTION_INTERVAL", 10000);
public static final int WAN_LOCATOR_PING_INTERVAL =
- Integer.getInteger("WANLocator.PING_INTERVAL", 10000).intValue();
+ Integer.getInteger("WANLocator.PING_INTERVAL", 10000);
+
+ // For testing. When true, Thread.sleep() is not called in exchangeLocalLocators() or
+ // exchangeRemoteLocators()
+ private final boolean skipWaiting;
public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,
RemoteLocatorJoinRequest request, LocatorMembershipListener locatorListener) {
@@ -72,6 +79,20 @@ public class LocatorDiscovery {
InternalDataSerializer.getDSFIDSerializer().getObjectSerializer(),
InternalDataSerializer.getDSFIDSerializer().getObjectDeserializer(),
TcpSocketFactory.DEFAULT);
+ this.skipWaiting = false;
+ }
+
+ // Test constructor
+ @VisibleForTesting
+ LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,
+ RemoteLocatorJoinRequest request, LocatorMembershipListener locatorListener,
+ TcpClient locatorClient) {
+ this.discoverer = discoverer;
+ this.locatorId = locator;
+ this.request = request;
+ this.locatorListener = locatorListener;
+ this.locatorClient = locatorClient;
+ this.skipWaiting = true;
}
/**
@@ -79,7 +100,7 @@ public class LocatorDiscovery {
* swamp the logs in retries due to same batch failures.
*/
private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval =
- new ConcurrentHashMap<DistributionLocatorId, long[]>();
+ new ConcurrentHashMap<>();
/**
* The maximum size of {@link #failureLogInterval} beyond which it will start logging all failure
@@ -96,10 +117,10 @@ public class LocatorDiscovery {
public boolean skipFailureLogging(DistributionLocatorId locatorId) {
boolean skipLogging = false;
- if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
- long[] logInterval = this.failureLogInterval.get(locatorId);
+ if (failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
+ long[] logInterval = failureLogInterval.get(locatorId);
if (logInterval == null) {
- logInterval = this.failureLogInterval.putIfAbsent(locatorId,
+ logInterval = failureLogInterval.putIfAbsent(locatorId,
new long[] {System.currentTimeMillis(), 1000});
}
if (logInterval != null) {
@@ -132,21 +153,17 @@ public class LocatorDiscovery {
}
}
- private WanLocatorDiscoverer getDiscoverer() {
- return this.discoverer;
- }
-
private void exchangeLocalLocators() {
int retryAttempt = 1;
- while (!getDiscoverer().isStopped()) {
+ while (!discoverer.isStopped()) {
try {
- RemoteLocatorJoinResponse response =
- (RemoteLocatorJoinResponse) locatorClient.requestToServer(locatorId.getHost(),
- request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
+ RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse) locatorClient
+ .requestToServer(locatorId.getHost(), request, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (response != null) {
- LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener);
- logger.info("Locator discovery task exchanged locator information {} with {}: {}.",
- new Object[] {request.getLocator(), locatorId, response.getLocators()});
+ addExchangedLocators(response);
+ logger.info(
+ "Locator discovery task for locator {} exchanged locator information with {}: {}.",
+ request.getLocator(), locatorId, response.getLocators());
break;
}
} catch (IOException ioe) {
@@ -154,28 +171,26 @@ public class LocatorDiscovery {
ConnectionException coe =
new ConnectionException("Not able to connect to local locator after "
+ WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts", ioe);
- logger.fatal(String.format(
- "Locator discovery task could not exchange locator information %s with %s after %s retry attempts.",
- new Object[] {request.getLocator(), locatorId, retryAttempt}),
- coe);
+ logger.fatal(
+ "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts.",
+ request.getLocator(), locatorId, retryAttempt, coe);
break;
}
if (skipFailureLogging(locatorId)) {
logger.warn(
- "Locator discovery task could not exchange locator information {} with {} after {} retry attempts. Retrying in {} ms.",
- new Object[] {request.getLocator(), locatorId, retryAttempt,
- WAN_LOCATOR_CONNECTION_INTERVAL});
+ "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts. Retrying in {} ms.",
+ request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL);
}
try {
- Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+ if (!skipWaiting) {
+ Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
retryAttempt++;
- continue;
- } catch (ClassNotFoundException classNotFoundException) {
- logger.fatal("Locator discovery task encountred unexpected exception",
- classNotFoundException);
+ } catch (ClassNotFoundException | ClassCastException ex) {
+ logger.fatal("Locator discovery task encountered unexpected exception", ex);
break;
}
}
@@ -183,25 +198,23 @@ public class LocatorDiscovery {
public void exchangeRemoteLocators() {
int retryAttempt = 1;
- DistributionLocatorId remoteLocator = this.locatorId;
- while (!getDiscoverer().isStopped()) {
- RemoteLocatorJoinResponse response;
+ while (!discoverer.isStopped()) {
try {
- response =
- (RemoteLocatorJoinResponse) locatorClient.requestToServer(
- remoteLocator.getHost(),
- request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
+ RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse) locatorClient
+ .requestToServer(locatorId.getHost(), request, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (response != null) {
- LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener);
- logger.info("Locator discovery task exchanged locator information {} with {}: {}.",
- new Object[] {request.getLocator(), locatorId, response.getLocators()});
+ addExchangedLocators(response);
+ logger.info(
+ "Locator discovery task for locator {} exchanged locator information with {}: {}.",
+ request.getLocator(), locatorId, response.getLocators());
RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest("");
while (true) {
- Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
- RemoteLocatorPingResponse pingResponse =
- (RemoteLocatorPingResponse) locatorClient.requestToServer(
- new HostAndPort(remoteLocator.getHostName(), remoteLocator.getPort()),
- pingRequest, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
+ if (!skipWaiting) {
+ Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
+ }
+ RemoteLocatorPingResponse pingResponse = (RemoteLocatorPingResponse) locatorClient
+ .requestToServer(new HostAndPort(locatorId.getHostName(), locatorId.getPort()),
+ pingRequest, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (pingResponse != null) {
continue;
}
@@ -210,28 +223,27 @@ public class LocatorDiscovery {
}
} catch (IOException ioe) {
if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
- logger.fatal(String.format(
- "Locator discovery task could not exchange locator information %s with %s after %s retry attempts.",
- new Object[] {request.getLocator(), remoteLocator, retryAttempt}),
- ioe);
+ logger.fatal(
+ "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts.",
+ request.getLocator(), locatorId, retryAttempt, ioe);
break;
}
- if (skipFailureLogging(remoteLocator)) {
+ if (skipFailureLogging(locatorId)) {
logger.warn(
- "Locator discovery task could not exchange locator information {} with {} after {} retry attempts. Retrying in {} ms.",
- new Object[] {request.getLocator(), remoteLocator, retryAttempt,
+ "Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts. Retrying in {} ms.",
+ new Object[] {request.getLocator(), locatorId, retryAttempt,
WAN_LOCATOR_CONNECTION_INTERVAL});
}
try {
- Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+ if (!skipWaiting) {
+ Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
retryAttempt++;
- continue;
- } catch (ClassNotFoundException classNotFoundException) {
- logger.fatal("Locator discovery task encountred unexpected exception",
- classNotFoundException);
+ } catch (ClassNotFoundException | ClassCastException ex) {
+ logger.fatal("Locator discovery task encountered unexpected exception", ex);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -239,4 +251,9 @@ public class LocatorDiscovery {
}
}
+ @VisibleForTesting
+ void addExchangedLocators(RemoteLocatorJoinResponse response) {
+ LocatorHelper.addExchangedLocators(response.getLocators(), locatorListener);
+ }
+
}
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscoveryTest.java b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscoveryTest.java
new file mode 100644
index 0000000..03983cd
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorDiscoveryTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal.locator.wan;
+
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.WanLocatorDiscoverer;
+import org.apache.geode.distributed.internal.tcpserver.TcpClient;
+import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+
+public class LocatorDiscoveryTest {
+ private DistributionLocatorId locatorId;
+ private RemoteLocatorJoinRequest request;
+ private LocatorDiscovery locatorDiscovery;
+ private TcpClient locatorClient;
+ private WanLocatorDiscoverer discoverer;
+ private LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery;
+ private LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery;
+
+ @Before
+ public void setUp() {
+ discoverer = mock(WanLocatorDiscoverer.class);
+ locatorId = mock(DistributionLocatorId.class);
+ request = mock(RemoteLocatorJoinRequest.class);
+ LocatorMembershipListener listener = mock(LocatorMembershipListener.class);
+ locatorClient = mock(TcpClient.class);
+ locatorDiscovery =
+ spy(new LocatorDiscovery(discoverer, locatorId, request, listener, locatorClient));
+ localLocatorDiscovery = locatorDiscovery.new LocalLocatorDiscovery();
+ remoteLocatorDiscovery = locatorDiscovery.new RemoteLocatorDiscovery();
+ }
+
+ @Test
+ public void skipFailureLoggingReturnsCorrectly() {
+ // First call should always be false
+ assertFalse(locatorDiscovery.skipFailureLogging(locatorId));
+ long firstReturnedFalse = System.currentTimeMillis();
+
+ // Next calls should only be false if more than 1000ms has passed since the last call that
+ // returned false
+ assertTrue(locatorDiscovery.skipFailureLogging(locatorId));
+ assertTrue(locatorDiscovery.skipFailureLogging(locatorId));
+ await().until(() -> System.currentTimeMillis() - firstReturnedFalse > 1000);
+ assertFalse(locatorDiscovery.skipFailureLogging(locatorId));
+ long secondReturnedFalse = System.currentTimeMillis();
+
+ // Next calls should only be false if more than 2000ms has passed since the last call that
+ // returned false
+ assertTrue(locatorDiscovery.skipFailureLogging(locatorId));
+ await().until(() -> System.currentTimeMillis() - secondReturnedFalse > 2000);
+ assertFalse(locatorDiscovery.skipFailureLogging(locatorId));
+ }
+
+ @Test
+ public void localLocatorDiscoveryDoesNothingWhenDiscovererIsStopped() {
+ when(discoverer.isStopped()).thenReturn(true);
+ localLocatorDiscovery.run();
+ verifyNoInteractions(locatorClient);
+ }
+
+ @Test
+ public void localLocatorDiscoveryStopsWithNonNullResponse()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ RemoteLocatorJoinResponse response = mock(RemoteLocatorJoinResponse.class);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(response);
+ doNothing().when(locatorDiscovery).addExchangedLocators(response);
+
+ localLocatorDiscovery.run();
+
+ // Confirm that we stopped after the first response was received
+ verify(locatorDiscovery, times(1)).addExchangedLocators(response);
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void localLocatorDiscoveryRetriesWithNullResponse()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(null);
+
+ localLocatorDiscovery.run();
+
+ // Confirm that we retried each time the response was null
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void localLocatorRetriesWhenIOException() throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenThrow(new IOException());
+
+ localLocatorDiscovery.run();
+
+ // Confirm that we retried after the first exception was thrown
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void localLocatorDoesNotRetryWhenClassNotFoundException()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenThrow(new ClassNotFoundException());
+
+ localLocatorDiscovery.run();
+
+ // Confirm that we did not retry after the first exception
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void localLocatorDoesNotRetryWhenClassCastException()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenThrow(new ClassCastException());
+
+ localLocatorDiscovery.run();
+
+ // Confirm that we did not retry after the first exception
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDiscoveryDoesNothingWhenDiscovererIsStopped() {
+ when(discoverer.isStopped()).thenReturn(true);
+ remoteLocatorDiscovery.run();
+ verifyNoInteractions(locatorClient);
+ }
+
+ @Test
+ public void remoteLocatorDiscoveryPingsRemoteWhenJoinResponseIsNotNull()
+ throws IOException, ClassNotFoundException {
+ // Only allow one attempt before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(true);
+ RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(joinResponse);
+ doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+ // Return null to prevent the ping loop continuing forever
+ when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+ anyBoolean())).thenReturn(null);
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we sent a ping request after the first joinResponse was received
+ verify(locatorDiscovery, times(1)).addExchangedLocators(joinResponse);
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(1)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDiscoveryRetriesPingRemoteWhenJoinResponseIsNotNullAndPingResponseIsNotNull()
+ throws IOException, ClassNotFoundException {
+ // Only allow one attempt before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(true);
+ RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(joinResponse);
+ doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+ RemoteLocatorPingResponse pingResponse = mock(RemoteLocatorPingResponse.class);
+ // Return a non-null RemoteLocatorPingResponse, then return null to prevent the ping loop
+ // continuing forever
+ when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+ anyBoolean())).thenReturn(pingResponse).thenReturn(null);
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we retried pinging the remote locator
+ verify(locatorDiscovery, times(1)).addExchangedLocators(joinResponse);
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(2)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDiscoveryRetriesWithNullJoinResponse()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(null);
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we retried each time the response was null
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(0)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorRetriesWhenIOExceptionWhenSendingRemoteLocatorJoinRequest()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenThrow(new IOException());
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we retried after the first exception was thrown
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorRetriesWhenIOExceptionWhenSendingRemoteLocatorPingRequest()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(joinResponse);
+ doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+ when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+ anyBoolean())).thenThrow(new IOException());
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we retried after the first exception was thrown
+ verify(locatorDiscovery, times(2)).addExchangedLocators(any());
+ verify(locatorClient, times(2)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(2)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDoesNotRetryWhenClassNotFoundExceptionFromJoinRequest()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenThrow(new ClassNotFoundException());
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we did not retry after the first exception
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(0)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDoesNotRetryWhenClassNotFoundExceptionFromPingRequest()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(joinResponse);
+ doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+ when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+ anyBoolean())).thenThrow(new ClassNotFoundException());
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we did not retry after the first exception
+ verify(locatorDiscovery, times(1)).addExchangedLocators(any());
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(1)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDoesNotRetryWhenClassCastExceptionFromJoinRequest()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenThrow(new ClassCastException());
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we did not retry after the first exception
+ verify(locatorDiscovery, times(0)).addExchangedLocators(any());
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(0)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+
+ @Test
+ public void remoteLocatorDoesNotRetryWhenClassCastExceptionFromPingRequest()
+ throws IOException, ClassNotFoundException {
+ // Only allow two retries before stopping the locator discoverer
+ when(discoverer.isStopped()).thenReturn(false).thenReturn(false).thenReturn(true);
+ RemoteLocatorJoinResponse joinResponse = mock(RemoteLocatorJoinResponse.class);
+ when(locatorClient.requestToServer(any(), eq(request), anyInt(), anyBoolean()))
+ .thenReturn(joinResponse);
+ doNothing().when(locatorDiscovery).addExchangedLocators(joinResponse);
+
+ when(locatorClient.requestToServer(any(), any(RemoteLocatorPingRequest.class), anyInt(),
+ anyBoolean())).thenThrow(new ClassCastException());
+
+ remoteLocatorDiscovery.run();
+
+ // Confirm that we did not retry after the first exception
+ verify(locatorDiscovery, times(1)).addExchangedLocators(any());
+ verify(locatorClient, times(1)).requestToServer(any(), eq(request), anyInt(), anyBoolean());
+ verify(locatorClient, times(1)).requestToServer(any(), any(RemoteLocatorPingRequest.class),
+ anyInt(), anyBoolean());
+ }
+}