You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/26 16:50:49 UTC

[2/2] flink git commit: [FLINK-4418] [client] Improve resilience when InetAddress.getLocalHost() throws UnknownHostException

[FLINK-4418] [client] Improve resilience when InetAddress.getLocalHost() throws UnknownHostException

- If InetAddress.getLocalHost() throws UnknownHostException when
  attempting to connect with LOCAL_HOST strategy, the code will move on
  to try the other strategies instead of immediately failing.

- Also made minor code style improvements for trying the different strategies.

This closes #2383


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53f5a8cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53f5a8cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53f5a8cc

Branch: refs/heads/master
Commit: 53f5a8cc572afca617eca5ae128bb42073e9fcb2
Parents: 6a456c6
Author: Shannon Carey <re...@gmail.com>
Authored: Wed Aug 17 19:35:49 2016 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 26 17:53:19 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/net/ConnectionUtils.java      | 46 ++++++++---------
 .../flink/runtime/net/ConnectionUtilsTest.java  | 53 ++++++++++++++++++--
 2 files changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53f5a8cc/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 77324fa..dcf5a62 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -24,7 +24,11 @@ import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -37,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import scala.concurrent.duration.FiniteDuration;
 
+
 /**
  * Utilities to determine the network interface and address that should be used to bind the
  * TaskManager communication to.
@@ -110,40 +115,27 @@ public class ConnectionUtils {
 		long currentSleepTime = MIN_SLEEP_TIME;
 		long elapsedTime = 0;
 
+		final List<AddressDetectionState> strategies = Collections.unmodifiableList(
+			Arrays.asList(
+				AddressDetectionState.LOCAL_HOST,
+				AddressDetectionState.ADDRESS,
+				AddressDetectionState.FAST_CONNECT,
+				AddressDetectionState.SLOW_CONNECT));
+
 		// loop while there is time left
 		while (elapsedTime < maxWaitMillis) {
-			AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
-
 			boolean logging = elapsedTime >= startLoggingAfter;
 			if (logging) {
 				LOG.info("Trying to connect to " + targetAddress);
 			}
-			// go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
-			do {
+
+			// Try each strategy in order
+			for (AddressDetectionState strategy : strategies) {
 				InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
 				if (address != null) {
 					return address;
 				}
-
-				// pick the next strategy
-				switch (strategy) {
-					case LOCAL_HOST:
-						strategy = AddressDetectionState.ADDRESS;
-						break;
-					case ADDRESS:
-						strategy = AddressDetectionState.FAST_CONNECT;
-						break;
-					case FAST_CONNECT:
-						strategy = AddressDetectionState.SLOW_CONNECT;
-						break;
-					case SLOW_CONNECT:
-						strategy = null;
-						break;
-					default:
-						throw new RuntimeException("Unsupported strategy: " + strategy);
-				}
 			}
-			while (strategy != null);
 
 			// we have made a pass with all strategies over all interfaces
 			// sleep for a while before we make the next pass
@@ -229,7 +221,13 @@ public class ConnectionUtils {
 	{
 		// try LOCAL_HOST strategy independent of the network interfaces
 		if (strategy == AddressDetectionState.LOCAL_HOST) {
-			InetAddress localhostName = InetAddress.getLocalHost();
+			InetAddress localhostName;
+			try {
+				localhostName = InetAddress.getLocalHost();
+			} catch (UnknownHostException uhe) {
+				LOG.warn("Could not resolve local hostname to an IP address: {}", uhe.getMessage());
+				return null;
+			}
 
 			if (tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
 				LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");

http://git-wip-us.apache.org/repos/asf/flink/blob/53f5a8cc/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index 570f87c..13a8214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -17,17 +17,30 @@
  */
 package org.apache.flink.runtime.net;
 
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
+import java.io.IOException;
+import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the network utilities.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ConnectionUtils.class)
 public class ConnectionUtilsTest {
 
 	@Test
@@ -55,4 +68,36 @@ public class ConnectionUtilsTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testFindConnectingAddressWhenGetLocalHostThrows() throws Exception {
+		PowerMockito.mockStatic(InetAddress.class);
+		Mockito.when(InetAddress.getLocalHost()).thenThrow(new UnknownHostException()).thenCallRealMethod();
+
+		final InetAddress loopbackAddress = Inet4Address.getByName("127.0.0.1");
+		Thread socketServerThread;
+		try (ServerSocket socket = new ServerSocket(0, 1, loopbackAddress)) {
+			// Make sure that the thread will eventually die even if something else goes wrong
+			socket.setSoTimeout(10_000);
+			socketServerThread = new Thread(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						socket.accept();
+					} catch (IOException e) {
+						// ignore
+					}
+				}
+			});
+			socketServerThread.start();
+
+			final InetSocketAddress socketAddress = new InetSocketAddress(loopbackAddress, socket.getLocalPort());
+			final InetAddress address = ConnectionUtils.findConnectingAddress(
+				socketAddress, 2000, 400);
+
+			PowerMockito.verifyStatic();
+			// Make sure we got an address via alternative means
+			assertNotNull(address);
+		}
+	}
 }