You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/09/05 22:32:26 UTC
[geode] 01/01: GEODE-5591: handle IO exception and try with a
random port, then retry with next port
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-5591
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9a447547fd1bf6b251964c395fb9c55c1ed13f05
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Sep 5 15:29:40 2018 -0700
GEODE-5591: handle IO exception and try with a random port, then retry with next port
---
.../geode/internal/cache/wan/WANTestBase.java | 14 +---
.../cache/wan/misc/WANConfigurationJUnitTest.java | 15 +---
.../internal/cache/wan/GatewayReceiverImpl.java | 85 ++++++++++++----------
.../cache/wan/GatewayReceiverImplJUnitTest.java | 35 +++++++++
4 files changed, 88 insertions(+), 61 deletions(-)
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 0b1af66..a091942 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -37,6 +37,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATO
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.test.dunit.Host.getHost;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -2017,16 +2018,9 @@ public class WANTestBase extends DistributedTestCase {
fact.setManualStart(true);
fact.setBindAddress("200.112.204.10");
GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- fail("Expected GatewayReceiver Exception");
- } catch (GatewayReceiverException gRE) {
- logger.debug("Got the GatewayReceiverException", gRE);
- assertTrue(gRE.getMessage().contains("Failed to create server socket on"));
- } catch (IOException e) {
- e.printStackTrace();
- fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port);
- }
+ assertThatThrownBy(receiver::start)
+ .isInstanceOf(GatewayReceiverException.class)
+ .hasMessageContaining("No available free port found in the given range");
}
public static int createReceiverWithSSL(int locPort) {
diff --git a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
index edcd55f..1e8f361 100644
--- a/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
+++ b/geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
@@ -437,25 +437,18 @@ public class WANConfigurationJUnitTest {
public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setStartPort(50504);
+ fact.setStartPort(50505);
fact.setMaximumTimeBetweenPings(1000);
fact.setSocketBufferSize(4000);
- fact.setEndPort(70707);
+ fact.setEndPort(50505);
fact.setManualStart(true);
fact.setBindAddress("200.112.204.10");
GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
fact.addGatewayTransportFilter(myStreamFilter1);
+
GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- fail("Expected GatewayReceiverException");
- } catch (GatewayReceiverException gRE) {
- assertTrue(gRE.getMessage().contains("Failed to create server socket on"));
- } catch (IOException e) {
- e.printStackTrace();
- fail("The test failed with IOException");
- }
+ assertThatThrownBy(() -> receiver.start()).isInstanceOf(GatewayReceiverException.class).hasMessageContaining("Failed to create server socket on");
}
@Test
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
index 0f0fc63..071e2a2 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -15,8 +15,6 @@
package org.apache.geode.internal.cache.wan;
import java.io.IOException;
-import java.net.BindException;
-import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
@@ -138,6 +136,29 @@ public class GatewayReceiverImpl implements GatewayReceiver {
return receiver;
}
+ private boolean tryToStart() {
+ if (!AvailablePort.isPortAvailable(port, AvailablePort.SOCKET,
+ AvailablePort.getAddress(AvailablePort.SOCKET))) {
+ return false;
+ }
+
+ receiver.setPort(port);
+ receiver.setSocketBufferSize(socketBufferSize);
+ receiver.setMaximumTimeBetweenPings(timeBetPings);
+ if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) {
+ receiver.setHostnameForClients(hostnameForSenders);
+ }
+ receiver.setBindAddress(bindAdd);
+ receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP});
+ ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters);
+ try {
+ receiver.start();
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
public void start() throws IOException {
if (receiver == null) {
receiver = this.cache.addCacheServer(true);
@@ -146,53 +167,37 @@ public class GatewayReceiverImpl implements GatewayReceiver {
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_IS_ALREADY_RUNNING));
return;
}
+
+ int loopStartPort = getPortToStart();
+ int port = loopStartPort;
boolean started = false;
- this.port = getPortToStart();
- while (!started && this.port != -1) {
- receiver.setPort(this.port);
- receiver.setSocketBufferSize(socketBufferSize);
- receiver.setMaximumTimeBetweenPings(timeBetPings);
- if (hostnameForSenders != null && !hostnameForSenders.isEmpty()) {
- receiver.setHostnameForClients(hostnameForSenders);
- }
- receiver.setBindAddress(bindAdd);
- receiver.setGroups(new String[] {GatewayReceiver.RECEIVER_GROUP});
- ((CacheServerImpl) receiver).setGatewayTransportFilter(this.filters);
- try {
- receiver.start();
- started = true;
- } catch (BindException be) {
- if (be.getCause() != null
- && be.getCause().getMessage().contains("assign requested address")) {
- throw new GatewayReceiverException(
- LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
- .toLocalizedString(new Object[] {bindAdd, this.port}));
- }
- // ignore as this port might have been used by other threads.
- logger.warn(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use,
- this.port));
- this.port = getPortToStart();
- } catch (SocketException se) {
- if (se.getMessage().contains("Address already in use")) {
- logger.warn(LocalizedMessage
- .create(LocalizedStrings.GatewayReceiver_Address_Already_In_Use, this.port));
- this.port = getPortToStart();
-
- } else {
- throw se;
- }
+ while (!started) {
+ this.port = port;
+ started = tryToStart();
+ if (started) {
+ break;
}
+ // get next port to try
+ if (port == endPort && startPort != endPort) {
+ port = startPort;
+ } else {
+ port++;
+ }
+ if (port == loopStartPort || port > endPort) {
+ logger.warn("No available free port found in the given range (" +
+ this.startPort + "-" + this.endPort + ")");
+ throw new GatewayReceiverException(
+ LocalizedStrings.SocketCreator_FAILED_TO_CREATE_SERVER_SOCKET_ON_0_1
+ .toLocalizedString(new Object[] {bindAdd, this.port}));
+ }
}
- if (!started) {
- throw new IllegalStateException("No available free port found in the given range.");
- }
+
logger
.info(LocalizedMessage.create(LocalizedStrings.GatewayReceiver_STARTED_ON_PORT, this.port));
InternalDistributedSystem system = this.cache.getInternalDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_START, this);
-
}
private int getPortToStart() {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
index 3fd732a..497b8bf 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
@@ -15,14 +15,17 @@
package org.apache.geode.internal.cache.wan;
import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.net.SocketException;
import java.net.UnknownHostException;
import org.junit.Test;
@@ -110,4 +113,36 @@ public class GatewayReceiverImplJUnitTest {
verify(cache, times(1)).removeGatewayReceiver(gateway);
}
+ @Test
+ public void testFailToStartWith2NextPorts() throws IOException {
+ InternalCache cache = mock(InternalCache.class);
+ CacheServerImpl server = mock(CacheServerImpl.class);
+ when(cache.addCacheServer(eq(true))).thenReturn(server);
+ doThrow(new SocketException("Address already in use")).when(server).start();
+ GatewayReceiverImpl gateway =
+ new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, true);
+ assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class).hasMessageContaining("Failed to create server socket on");
+ }
+
+ @Test
+ public void testFailToStartWithSamePort() throws IOException {
+ InternalCache cache = mock(InternalCache.class);
+ CacheServerImpl server = mock(CacheServerImpl.class);
+ when(cache.addCacheServer(eq(true))).thenReturn(server);
+ doThrow(new SocketException("Address already in use")).when(server).start();
+ GatewayReceiverImpl gateway =
+ new GatewayReceiverImpl(cache, 2000, 2000, 5, 100, null, null, null, true);
+ assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class).hasMessageContaining("Failed to create server socket on");
+ }
+
+ @Test
+ public void testFailToStartWithARangeOfPorts() throws IOException {
+ InternalCache cache = mock(InternalCache.class);
+ CacheServerImpl server = mock(CacheServerImpl.class);
+ when(cache.addCacheServer(eq(true))).thenReturn(server);
+ doThrow(new SocketException("Address already in use")).when(server).start();
+ GatewayReceiverImpl gateway =
+ new GatewayReceiverImpl(cache, 2000, 2100, 5, 100, null, null, null, true);
+ assertThatThrownBy(() -> gateway.start()).isInstanceOf(GatewayReceiverException.class).hasMessageContaining("Failed to create server socket on");
+ }
}