You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/17 15:39:21 UTC

[6/6] flink git commit: [FLINK-2967] Increase timeout for LOCAL_HOST address detection strategy, give the local host address a higher priority

[FLINK-2967] Increase timeout for LOCAL_HOST address detection strategy, give the local host address a higher priority

This closes #1391


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a45212d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a45212d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a45212d2

Branch: refs/heads/master
Commit: a45212d2b18cc12e3c314ed43e8d19943693deed
Parents: ffb8aec
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Nov 16 21:26:57 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 17 15:38:06 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/net/ConnectionUtils.java      | 58 +++++++++++++++++---
 1 file changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a45212d2/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 542e69e..d53e0fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -32,8 +32,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -53,7 +55,7 @@ public class ConnectionUtils {
 	 */
 	private enum AddressDetectionState {
 		/** Connect from interface returned by InetAddress.getLocalHost() **/
-		LOCAL_HOST(50),
+		LOCAL_HOST(200),
 		/** Detect own IP address based on the target IP address. Look for common prefix */
 		ADDRESS(50),
 		/** Try to connect on all Interfaces and all their addresses with a low timeout */
@@ -63,7 +65,7 @@ public class ConnectionUtils {
 		/** Choose any non-loopback address */
 		HEURISTIC(0);
 
-		private int timeout;
+		private final int timeout;
 
 		AddressDetectionState(int timeout) {
 			this.timeout = timeout;
@@ -180,17 +182,59 @@ public class ConnectionUtils {
 		}
 	}
 
+	/**
+	 * This utility method tries to connect to the JobManager using the InetAddress returned by
+	 * InetAddress.getLocalHost(). The purpose of the utility is to have a final try connecting to
+	 * the target address using the LocalHost before using the address returned.
+	 * We do a second try because the JM might have been unavailable during the first check.
+	 *
+	 * @param preliminaryResult The address detected by the heuristic
+	 * @return either the preliminaryResult or the address returned by InetAddress.getLocalHost() (if
+	 * 			we are able to connect to targetAddress from there)
+	 */
+	private static InetAddress tryLocalHostBeforeReturning(
+				InetAddress preliminaryResult, SocketAddress targetAddress, boolean logging) throws IOException {
+		
+		InetAddress localhostName = InetAddress.getLocalHost();
+		
+		if (preliminaryResult.equals(localhostName)) {
+			// preliminary result is equal to the local host name 
+			return preliminaryResult;
+		}
+		else if (tryToConnect(localhostName, targetAddress, AddressDetectionState.SLOW_CONNECT.getTimeout(), logging)) {
+			// success, we were able to use local host to connect
+			LOG.debug("Preferring {} (InetAddress.getLocalHost()) for local bind point over previous candidate {}",
+					localhostName, preliminaryResult);
+			return localhostName;
+		}
+		else {
+			// we have to make the preliminary result the final result
+			return preliminaryResult;
+		}
+	}
 
+	/**
+	 * Try to find a local address which allows as to connect to the targetAddress using the given
+	 * strategy
+	 *
+	 * @param strategy Depending on the strategy, the method will enumerate all interfaces, trying to connect
+	 *                 to the target address
+	 * @param targetAddress The address we try to connect to
+	 * @param logging Boolean indicating the logging verbosity
+	 * @return null if we could not find an address using this strategy, otherwise, the local address.
+	 * @throws IOException
+	 */
 	private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
 														InetSocketAddress targetAddress,
 														boolean logging) throws IOException
 	{
 		// try LOCAL_HOST strategy independent of the network interfaces
-		if(strategy == AddressDetectionState.LOCAL_HOST) {
+		if (strategy == AddressDetectionState.LOCAL_HOST) {
 			InetAddress localhostName = InetAddress.getLocalHost();
 
-			if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
+			if (tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
 				LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
+				// Here, we are not calling tryLocalHostBeforeReturning() because it is the LOCAL_HOST strategy
 				return localhostName;
 			} else {
 				return null;
@@ -217,7 +261,7 @@ public class ConnectionUtils {
 										targetAddress, interfaceAddress);
 
 							if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
-								return interfaceAddress;
+								return tryLocalHostBeforeReturning(interfaceAddress, targetAddress, logging);
 							}
 						}
 						break;
@@ -228,7 +272,7 @@ public class ConnectionUtils {
 								targetAddress, interfaceAddress, strategy.getTimeout());
 
 						if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
-							return interfaceAddress;
+							return tryLocalHostBeforeReturning(interfaceAddress, targetAddress, logging);
 						}
 						break;
 
@@ -242,7 +286,7 @@ public class ConnectionUtils {
 						if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
 								!interfaceAddress.isLoopbackAddress())
 						{
-							return interfaceAddress;
+							return tryLocalHostBeforeReturning(interfaceAddress, targetAddress, logging);
 						}
 						break;