You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/02 19:43:55 UTC

[2/4] flink git commit: [FLINK-1608] [taskmanager] Network address selection makes multiple attempts before switching to heuristics.

[FLINK-1608] [taskmanager] Network address selection makes multiple attempts before switching to heuristics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/861ebe75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/861ebe75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/861ebe75

Branch: refs/heads/master
Commit: 861ebe753ff982b4cbf7c3c5b8c43fa306ac89f0
Parents: 933609f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 25 13:51:03 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/net/NetUtils.java  | 253 ++++++++++++++++---
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  10 +-
 .../apache/flink/runtime/net/NetUtilsTest.java  |  69 +++++
 4 files changed, 303 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 8e0a41a..94073db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -31,73 +31,84 @@ import java.util.Enumeration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Utilities to determine the network interface and address that should be used to bind the
+ * TaskManager communication to.
+ */
 public class NetUtils {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
 
+	private static final long MIN_SLEEP_TIME = 50;
+	private static final long MAX_SLEEP_TIME = 20000;
+
 	/**
 	 * The states of address detection mechanism.
 	 * There is only a state transition if the current state failed to determine the address.
 	 */
 	private enum AddressDetectionState {
-		ADDRESS(50), 		//detect own IP based on the JobManagers IP address. Look for common prefix
-		FAST_CONNECT(50),	//try to connect to the JobManager on all Interfaces and all their addresses.
-		//this state uses a low timeout (say 50 ms) for fast detection.
-		SLOW_CONNECT(1000),	//same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
+		/** Detect own IP address based on the target IP address. Look for common prefix */
+		ADDRESS(50),
+		/** Try to connect on all Interfaces and all their addresses with a low timeout */
+		FAST_CONNECT(50),
+		/** Try to connect on all Interfaces and all their addresses with a long timeout */
+		SLOW_CONNECT(1000),
+		/** Choose any non-loopback address */
 		HEURISTIC(0);
 
-
 		private int timeout;
+
 		AddressDetectionState(int timeout) {
 			this.timeout = timeout;
 		}
+
 		public int getTimeout() {
 			return timeout;
 		}
 	}
 
 	/**
-	 * Find out the TaskManager's own IP address.
+	 * Find out the TaskManager's own IP address, simple version.
 	 */
 	public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException {
 		AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC;
 
 		while (true) {
 			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-			
+
 			while (e.hasMoreElements()) {
 				NetworkInterface n = e.nextElement();
 				Enumeration<InetAddress> ee = n.getInetAddresses();
-				
+
 				while (ee.hasMoreElements()) {
 					InetAddress i = ee.nextElement();
-					
+
 					switch (strategy) {
 						case ADDRESS:
 							if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
-								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
+								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) {
 									LOG.info("Determined {} as the machine's own IP address", i);
 									return i;
 								}
 							}
 							break;
-							
+
 						case FAST_CONNECT:
 						case SLOW_CONNECT:
-							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
+							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true);
 							if (correct) {
 								LOG.info("Determined {} as the machine's own IP address", i);
 								return i;
 							}
 							break;
-							
+
 						case HEURISTIC:
 							if (LOG.isDebugEnabled()) {
 								LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
 										" isLinkLocalAddress:" + i.isLinkLocalAddress() +
 										" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
 							}
-							
+
 							if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
 								LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
 										"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -105,7 +116,7 @@ public class NetUtils {
 								return i;
 							}
 							break;
-							
+
 						default:
 							throw new RuntimeException("Unknown address detection strategy: " + strategy);
 					}
@@ -139,6 +150,171 @@ public class NetUtils {
 	}
 
 	/**
+	 * Finds the local network address from which this machine can connect to the target
+	 * address. This method tries to establish a proper network connection to the
+	 * given target, so it only succeeds if the target socket address actually accepts
+	 * connections. The method tries various strategies multiple times and uses an exponential
+	 * backoff timer between tries.
+	 * <p>
+	 * If no connection attempt was successful after the given maximum time, the method
+	 * will choose some address based on heuristics (excluding link-local and loopback addresses.)
+	 * <p>
+	 * This method will initially not log on info level (to not flood the log while the
+	 * backoff time is still very low). It will start logging after a certain time
+	 * has passes.
+	 *
+	 * @param targetAddress The address that the method tries to connect to.
+	 * @param maxWaitMillis The maximum time that this method tries to connect, before falling
+	 *                       back to the heuristics.
+	 * @param startLoggingAfter The time after which the method will log on INFO level.
+	 */
+	public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
+							long maxWaitMillis, long startLoggingAfter) throws IOException
+	{
+		if (targetAddress == null) {
+			throw new NullPointerException("targetAddress must not be null");
+		}
+		if (maxWaitMillis <= 0) {
+			throw new IllegalArgumentException("Max wait time must be positive");
+		}
+
+		final long startTime = System.currentTimeMillis();
+
+		long currentSleepTime = MIN_SLEEP_TIME;
+		long elapsedTime = 0;
+
+		// loop while there is time left
+		while (elapsedTime < maxWaitMillis) {
+			AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+
+			boolean logging = elapsedTime >= startLoggingAfter;
+			if (logging) {
+				LOG.info("Trying to connect to " + targetAddress);
+			}
+			// go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
+			do {
+				InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
+				if (address != null) {
+					return address;
+				}
+
+				// pick the next strategy
+				switch (strategy) {
+					case ADDRESS:
+						strategy = AddressDetectionState.FAST_CONNECT;
+						break;
+					case FAST_CONNECT:
+						strategy = AddressDetectionState.SLOW_CONNECT;
+						break;
+					case SLOW_CONNECT:
+						strategy = null;
+						break;
+					default:
+						throw new RuntimeException("Unsupported strategy: " + strategy);
+				}
+			}
+			while (strategy != null);
+
+			// we have made a pass with all strategies over all interfaces
+			// sleep for a while before we make the next pass
+			elapsedTime = System.currentTimeMillis() - startTime;
+
+			long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+			if (toWait > 0) {
+				if (logging) {
+					LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
+				} else {
+					LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
+				}
+
+				try {
+					Thread.sleep(toWait);
+				}
+				catch (InterruptedException e) {
+					throw new IOException("Connection attempts have been interrupted.");
+				}
+			}
+
+			// increase the exponential backoff timer
+			currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+		}
+
+		// our attempts timed out. use the heuristic fallback
+		LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+		InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+		if (heuristic != null) {
+			return heuristic;
+		}
+		else {
+			LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+			return InetAddress.getLocalHost();
+		}
+	}
+
+
+	private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
+														InetSocketAddress targetAddress,
+														boolean logging) throws IOException
+	{
+		final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
+
+		// for each network interface
+		Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+		while (e.hasMoreElements()) {
+
+			NetworkInterface netInterface = e.nextElement();
+
+			// for each address of the network interface
+			Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+			while (ee.hasMoreElements()) {
+				InetAddress interfaceAddress = ee.nextElement();
+
+				switch (strategy) {
+					case ADDRESS:
+						if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
+							LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
+										targetAddress, interfaceAddress);
+
+							if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+								return interfaceAddress;
+							}
+						}
+						break;
+
+					case FAST_CONNECT:
+					case SLOW_CONNECT:
+						LOG.debug("Trying to connect to {} from local address {} with timeout {}",
+								targetAddress, interfaceAddress, strategy.getTimeout());
+
+						if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+							return interfaceAddress;
+						}
+						break;
+
+					case HEURISTIC:
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
+									interfaceAddress, interfaceAddress.isLinkLocalAddress(),
+									interfaceAddress.isLoopbackAddress());
+						}
+						// pick a non-loopback non-link-local address
+						if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
+								!interfaceAddress.isLoopbackAddress())
+						{
+							return interfaceAddress;
+						}
+						break;
+
+					default:
+						throw new RuntimeException("Unsupported strategy: " + strategy);
+				}
+			} // end for each address of the interface
+		} // end for each interface
+
+		return null;
+	}
+
+	/**
 	 * Checks if two addresses have a common prefix (first 2 bytes).
 	 * Example: 192.168.???.???
 	 * Works also with ipv6, but accepts probably too many addresses
@@ -147,35 +323,52 @@ public class NetUtils {
 		return address[0] == address2[0] && address[1] == address2[1];
 	}
 
-	public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
+	/**
+	 *
+	 * @param fromAddress The address to connect from.
+	 * @param toSocket The socket address to connect to.
+	 * @param timeout The timeout fr the connection.
+	 * @param logFailed Flag to indicate whether to log failed attempts on info level
+	 *                  (failed attempts are always logged on DEBUG level).
+	 * @return True, if the connection was successful, false otherwise.
+	 * @throws IOException Thrown if the socket cleanup fails.
+	 */
+	private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
+										int timeout, boolean logFailed) throws IOException
+	{
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+			LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
 					+ " with timeout " + timeout);
 		}
-		boolean connectable = true;
-		Socket socket = null;
+		Socket socket = new Socket();
 		try {
-			socket = new Socket();
-			SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
+			// port 0 = let the OS choose the port
+			SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
 			// machine
 			socket.bind(bindP);
 			socket.connect(toSocket, timeout);
-		} catch (Exception ex) {
-			LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
+			return true;
+		}
+		catch (Exception ex) {
+			String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Failed with exception", ex);
+				LOG.debug(message, ex);
+			} else if (logFailed) {
+				LOG.info(message);
 			}
-			connectable = false;
+			return false;
 		}
 		finally {
-			if (socket != null) {
-				socket.close();
-			}
+			socket.close();
 		}
-		return connectable;
 	}
 
-	public static final int getAvailablePort() {
+	/**
+	 * Find a non-occupied port.
+	 *
+	 * @return A non-occupied port.
+	 */
+	public static int getAvailablePort() {
 		for (int i = 0; i < 50; i++) {
 			ServerSocket serverSocket = null;
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 415a20c..85ebdce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -756,8 +756,8 @@ object JobManager {
   def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
       head("Flink JobManager")
-      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify " +
-        "the configuration directory.")
+      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
+        "The configuration directory.")
       opt[String]("executionMode") optional() action { (arg, c) =>
         if(arg.equals("local")){
           c.copy(executionMode = LOCAL)
@@ -765,7 +765,7 @@ object JobManager {
           c.copy(executionMode = CLUSTER)
         }
       } text {
-        "Specify the execution mode of the JobManager (CLUSTER / LOCAL)"
+        "The execution mode of the JobManager (CLUSTER / LOCAL)"
       }
     }
 
@@ -785,7 +785,7 @@ object JobManager {
 
         (configuration, config.executionMode, hostname, port)
     } getOrElse {
-      throw new Exception("Invalid command line arguments. Usage: " + parser.usage)
+      throw new Exception("Invalid command line arguments: " + parser.usage)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7011eb4..182f8cf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -102,7 +102,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
   log.info("Starting task manager at {}.", self.path)
   log.info("Creating {} task slot(s).", numberOfSlots)
-  log.info("TaskManager connection information {}.", connectionInfo)
+  log.info("TaskManager connection information: {}", connectionInfo)
 
   val HEARTBEAT_INTERVAL = 5000 millisecond
 
@@ -843,10 +843,14 @@ object TaskManager {
     else {
       // try to find out the hostname of the interface from which the TaskManager
       // can connect to the JobManager. This involves a reverse name lookup
-      LOG.info("Trying to select the network interface and address/hostname to use")
+      LOG.info("Trying to select the network interface and address to use " +
+        "by connecting to the configured JobManager")
+
       val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
       taskManagerHostname = try {
-        NetUtils.resolveAddress(jobManagerAddress).getHostName()
+        // try to get the address for up to two minutes and start
+        // logging only after ten seconds
+        NetUtils.findConnectingAddress(jobManagerAddress, 120000, 10000).getHostName()
       }
       catch {
         case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +

http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
new file mode 100644
index 0000000..3794fa0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the network utilities.
+ */
+public class NetUtilsTest {
+
+	@Test
+	public void testFindConnectableAddress() {
+		int unusedPort;
+		try {
+			unusedPort = NetUtils.getAvailablePort();
+		}
+		catch (Throwable t) {
+			// if this system cannot find an available port,
+			// skip this test
+			return;
+		}
+
+		try {
+			// create an unreachable target address
+			InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
+
+			final long start = System.currentTimeMillis();
+			InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
+
+			// check that it did not take forever
+			assertTrue(System.currentTimeMillis() - start < 8000);
+
+			// we should have found a heuristic address
+			assertNotNull(add);
+
+			// these checks are desirable, but will not work on every machine
+			// such as machines with no connected network media, which may
+			// default to a link local address
+			// assertFalse(add.isLinkLocalAddress());
+			// assertFalse(add.isLoopbackAddress());
+			// assertFalse(add.isAnyLocalAddress());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}