You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/26 16:50:49 UTC
[2/2] flink git commit: [FLINK-4418] [client] Improve resilience when
InetAddress.getLocalHost() throws UnknownHostException
[FLINK-4418] [client] Improve resilience when InetAddress.getLocalHost() throws UnknownHostException
- If InetAddress.getLocalHost() throws UnknownHostException when
attempting to connect with LOCAL_HOST strategy, the code will move on
to try the other strategies instead of immediately failing.
- Also made minor code style improvements for trying the different strategies.
This closes #2383
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f5a8cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f5a8cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f5a8cc
Branch: refs/heads/master
Commit: 53f5a8cc572afca617eca5ae128bb42073e9fcb2
Parents: 6a456c6
Author: Shannon Carey <re...@gmail.com>
Authored: Wed Aug 17 19:35:49 2016 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 26 17:53:19 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/net/ConnectionUtils.java | 46 ++++++++---------
.../flink/runtime/net/ConnectionUtilsTest.java | 53 ++++++++++++++++++--
2 files changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53f5a8cc/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 77324fa..dcf5a62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -24,7 +24,11 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Enumeration;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -37,6 +41,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
+
/**
* Utilities to determine the network interface and address that should be used to bind the
* TaskManager communication to.
@@ -110,40 +115,27 @@ public class ConnectionUtils {
long currentSleepTime = MIN_SLEEP_TIME;
long elapsedTime = 0;
+ final List<AddressDetectionState> strategies = Collections.unmodifiableList(
+ Arrays.asList(
+ AddressDetectionState.LOCAL_HOST,
+ AddressDetectionState.ADDRESS,
+ AddressDetectionState.FAST_CONNECT,
+ AddressDetectionState.SLOW_CONNECT));
+
// loop while there is time left
while (elapsedTime < maxWaitMillis) {
- AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
-
boolean logging = elapsedTime >= startLoggingAfter;
if (logging) {
LOG.info("Trying to connect to " + targetAddress);
}
- // go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
- do {
+
+ // Try each strategy in order
+ for (AddressDetectionState strategy : strategies) {
InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
if (address != null) {
return address;
}
-
- // pick the next strategy
- switch (strategy) {
- case LOCAL_HOST:
- strategy = AddressDetectionState.ADDRESS;
- break;
- case ADDRESS:
- strategy = AddressDetectionState.FAST_CONNECT;
- break;
- case FAST_CONNECT:
- strategy = AddressDetectionState.SLOW_CONNECT;
- break;
- case SLOW_CONNECT:
- strategy = null;
- break;
- default:
- throw new RuntimeException("Unsupported strategy: " + strategy);
- }
}
- while (strategy != null);
// we have made a pass with all strategies over all interfaces
// sleep for a while before we make the next pass
@@ -229,7 +221,13 @@ public class ConnectionUtils {
{
// try LOCAL_HOST strategy independent of the network interfaces
if (strategy == AddressDetectionState.LOCAL_HOST) {
- InetAddress localhostName = InetAddress.getLocalHost();
+ InetAddress localhostName;
+ try {
+ localhostName = InetAddress.getLocalHost();
+ } catch (UnknownHostException uhe) {
+ LOG.warn("Could not resolve local hostname to an IP address: {}", uhe.getMessage());
+ return null;
+ }
if (tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
http://git-wip-us.apache.org/repos/asf/flink/blob/53f5a8cc/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index 570f87c..13a8214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -17,17 +17,30 @@
*/
package org.apache.flink.runtime.net;
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
+import java.io.IOException;
+import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the network utilities.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ConnectionUtils.class)
public class ConnectionUtilsTest {
@Test
@@ -55,4 +68,36 @@ public class ConnectionUtilsTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testFindConnectingAddressWhenGetLocalHostThrows() throws Exception {
+ PowerMockito.mockStatic(InetAddress.class);
+ Mockito.when(InetAddress.getLocalHost()).thenThrow(new UnknownHostException()).thenCallRealMethod();
+
+ final InetAddress loopbackAddress = Inet4Address.getByName("127.0.0.1");
+ Thread socketServerThread;
+ try (ServerSocket socket = new ServerSocket(0, 1, loopbackAddress)) {
+ // Make sure that the thread will eventually die even if something else goes wrong
+ socket.setSoTimeout(10_000);
+ socketServerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ socket.accept();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ });
+ socketServerThread.start();
+
+ final InetSocketAddress socketAddress = new InetSocketAddress(loopbackAddress, socket.getLocalPort());
+ final InetAddress address = ConnectionUtils.findConnectingAddress(
+ socketAddress, 2000, 400);
+
+ PowerMockito.verifyStatic();
+ // Make sure we got an address via alternative means
+ assertNotNull(address);
+ }
+ }
}