You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/02 19:43:55 UTC
[2/4] flink git commit: [FLINK-1608] [taskmanager] Network address
selection makes multiple attempts before switching to heuristics.
[FLINK-1608] [taskmanager] Network address selection makes multiple attempts before switching to heuristics.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/861ebe75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/861ebe75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/861ebe75
Branch: refs/heads/master
Commit: 861ebe753ff982b4cbf7c3c5b8c43fa306ac89f0
Parents: 933609f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 25 13:51:03 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/runtime/net/NetUtils.java | 253 ++++++++++++++++---
.../flink/runtime/jobmanager/JobManager.scala | 8 +-
.../flink/runtime/taskmanager/TaskManager.scala | 10 +-
.../apache/flink/runtime/net/NetUtilsTest.java | 69 +++++
4 files changed, 303 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 8e0a41a..94073db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -31,73 +31,84 @@ import java.util.Enumeration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Utilities to determine the network interface and address that should be used to bind the
+ * TaskManager communication to.
+ */
public class NetUtils {
private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
+ private static final long MIN_SLEEP_TIME = 50;
+ private static final long MAX_SLEEP_TIME = 20000;
+
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
*/
private enum AddressDetectionState {
- ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix
- FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses.
- //this state uses a low timeout (say 50 ms) for fast detection.
- SLOW_CONNECT(1000), //same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
+ /** Detect own IP address based on the target IP address. Look for common prefix */
+ ADDRESS(50),
+ /** Try to connect on all Interfaces and all their addresses with a low timeout */
+ FAST_CONNECT(50),
+ /** Try to connect on all Interfaces and all their addresses with a long timeout */
+ SLOW_CONNECT(1000),
+ /** Choose any non-loopback address */
HEURISTIC(0);
-
private int timeout;
+
AddressDetectionState(int timeout) {
this.timeout = timeout;
}
+
public int getTimeout() {
return timeout;
}
}
/**
- * Find out the TaskManager's own IP address.
+ * Find out the TaskManager's own IP address, simple version.
*/
public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException {
AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC;
while (true) {
Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-
+
while (e.hasMoreElements()) {
NetworkInterface n = e.nextElement();
Enumeration<InetAddress> ee = n.getInetAddresses();
-
+
while (ee.hasMoreElements()) {
InetAddress i = ee.nextElement();
-
+
switch (strategy) {
case ADDRESS:
if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
- if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
+ if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) {
LOG.info("Determined {} as the machine's own IP address", i);
return i;
}
}
break;
-
+
case FAST_CONNECT:
case SLOW_CONNECT:
- boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
+ boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true);
if (correct) {
LOG.info("Determined {} as the machine's own IP address", i);
return i;
}
break;
-
+
case HEURISTIC:
if (LOG.isDebugEnabled()) {
LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
" isLinkLocalAddress:" + i.isLinkLocalAddress() +
" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
}
-
+
if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -105,7 +116,7 @@ public class NetUtils {
return i;
}
break;
-
+
default:
throw new RuntimeException("Unknown address detection strategy: " + strategy);
}
@@ -139,6 +150,171 @@ public class NetUtils {
}
/**
+ * Finds the local network address from which this machine can connect to the target
+ * address. This method tries to establish a proper network connection to the
+ * given target, so it only succeeds if the target socket address actually accepts
+ * connections. The method tries various strategies multiple times and uses an exponential
+ * backoff timer between tries.
+ * <p>
+ * If no connection attempt was successful after the given maximum time, the method
+ * will choose some address based on heuristics (excluding link-local and loopback addresses.)
+ * <p>
+ * This method will initially not log on info level (to not flood the log while the
+ * backoff time is still very low). It will start logging after a certain time
+ * has passes.
+ *
+ * @param targetAddress The address that the method tries to connect to.
+ * @param maxWaitMillis The maximum time that this method tries to connect, before falling
+ * back to the heuristics.
+ * @param startLoggingAfter The time after which the method will log on INFO level.
+ */
+ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
+ long maxWaitMillis, long startLoggingAfter) throws IOException
+ {
+ if (targetAddress == null) {
+ throw new NullPointerException("targetAddress must not be null");
+ }
+ if (maxWaitMillis <= 0) {
+ throw new IllegalArgumentException("Max wait time must be positive");
+ }
+
+ final long startTime = System.currentTimeMillis();
+
+ long currentSleepTime = MIN_SLEEP_TIME;
+ long elapsedTime = 0;
+
+ // loop while there is time left
+ while (elapsedTime < maxWaitMillis) {
+ AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+
+ boolean logging = elapsedTime >= startLoggingAfter;
+ if (logging) {
+ LOG.info("Trying to connect to " + targetAddress);
+ }
+ // go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
+ do {
+ InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
+ if (address != null) {
+ return address;
+ }
+
+ // pick the next strategy
+ switch (strategy) {
+ case ADDRESS:
+ strategy = AddressDetectionState.FAST_CONNECT;
+ break;
+ case FAST_CONNECT:
+ strategy = AddressDetectionState.SLOW_CONNECT;
+ break;
+ case SLOW_CONNECT:
+ strategy = null;
+ break;
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ }
+ while (strategy != null);
+
+ // we have made a pass with all strategies over all interfaces
+ // sleep for a while before we make the next pass
+ elapsedTime = System.currentTimeMillis() - startTime;
+
+ long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+ if (toWait > 0) {
+ if (logging) {
+ LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
+ } else {
+ LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
+ }
+
+ try {
+ Thread.sleep(toWait);
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Connection attempts have been interrupted.");
+ }
+ }
+
+ // increase the exponential backoff timer
+ currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+ }
+
+ // our attempts timed out. use the heuristic fallback
+ LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+ InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+ if (heuristic != null) {
+ return heuristic;
+ }
+ else {
+ LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+ return InetAddress.getLocalHost();
+ }
+ }
+
+
+ private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
+ InetSocketAddress targetAddress,
+ boolean logging) throws IOException
+ {
+ final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
+
+ // for each network interface
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+
+ NetworkInterface netInterface = e.nextElement();
+
+ // for each address of the network interface
+ Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress interfaceAddress = ee.nextElement();
+
+ switch (strategy) {
+ case ADDRESS:
+ if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
+ LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
+ targetAddress, interfaceAddress);
+
+ if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+ return interfaceAddress;
+ }
+ }
+ break;
+
+ case FAST_CONNECT:
+ case SLOW_CONNECT:
+ LOG.debug("Trying to connect to {} from local address {} with timeout {}",
+ targetAddress, interfaceAddress, strategy.getTimeout());
+
+ if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+ return interfaceAddress;
+ }
+ break;
+
+ case HEURISTIC:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
+ interfaceAddress, interfaceAddress.isLinkLocalAddress(),
+ interfaceAddress.isLoopbackAddress());
+ }
+ // pick a non-loopback non-link-local address
+ if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
+ !interfaceAddress.isLoopbackAddress())
+ {
+ return interfaceAddress;
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ } // end for each address of the interface
+ } // end for each interface
+
+ return null;
+ }
+
+ /**
* Checks if two addresses have a common prefix (first 2 bytes).
* Example: 192.168.???.???
* Works also with ipv6, but accepts probably too many addresses
@@ -147,35 +323,52 @@ public class NetUtils {
return address[0] == address2[0] && address[1] == address2[1];
}
- public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
+ /**
+ *
+ * @param fromAddress The address to connect from.
+ * @param toSocket The socket address to connect to.
+ * @param timeout The timeout fr the connection.
+ * @param logFailed Flag to indicate whether to log failed attempts on info level
+ * (failed attempts are always logged on DEBUG level).
+ * @return True, if the connection was successful, false otherwise.
+ * @throws IOException Thrown if the socket cleanup fails.
+ */
+ private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
+ int timeout, boolean logFailed) throws IOException
+ {
if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+ LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
+ " with timeout " + timeout);
}
- boolean connectable = true;
- Socket socket = null;
+ Socket socket = new Socket();
try {
- socket = new Socket();
- SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
+ // port 0 = let the OS choose the port
+ SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
// machine
socket.bind(bindP);
socket.connect(toSocket, timeout);
- } catch (Exception ex) {
- LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
+ return true;
+ }
+ catch (Exception ex) {
+ String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
if (LOG.isDebugEnabled()) {
- LOG.debug("Failed with exception", ex);
+ LOG.debug(message, ex);
+ } else if (logFailed) {
+ LOG.info(message);
}
- connectable = false;
+ return false;
}
finally {
- if (socket != null) {
- socket.close();
- }
+ socket.close();
}
- return connectable;
}
- public static final int getAvailablePort() {
+ /**
+ * Find a non-occupied port.
+ *
+ * @return A non-occupied port.
+ */
+ public static int getAvailablePort() {
for (int i = 0; i < 50; i++) {
ServerSocket serverSocket = null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 415a20c..85ebdce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -756,8 +756,8 @@ object JobManager {
def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
head("Flink JobManager")
- opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify " +
- "the configuration directory.")
+ opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
+ "The configuration directory.")
opt[String]("executionMode") optional() action { (arg, c) =>
if(arg.equals("local")){
c.copy(executionMode = LOCAL)
@@ -765,7 +765,7 @@ object JobManager {
c.copy(executionMode = CLUSTER)
}
} text {
- "Specify the execution mode of the JobManager (CLUSTER / LOCAL)"
+ "The execution mode of the JobManager (CLUSTER / LOCAL)"
}
}
@@ -785,7 +785,7 @@ object JobManager {
(configuration, config.executionMode, hostname, port)
} getOrElse {
- throw new Exception("Invalid command line arguments. Usage: " + parser.usage)
+ throw new Exception("Invalid command line arguments: " + parser.usage)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7011eb4..182f8cf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -102,7 +102,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
log.info("Starting task manager at {}.", self.path)
log.info("Creating {} task slot(s).", numberOfSlots)
- log.info("TaskManager connection information {}.", connectionInfo)
+ log.info("TaskManager connection information: {}", connectionInfo)
val HEARTBEAT_INTERVAL = 5000 millisecond
@@ -843,10 +843,14 @@ object TaskManager {
else {
// try to find out the hostname of the interface from which the TaskManager
// can connect to the JobManager. This involves a reverse name lookup
- LOG.info("Trying to select the network interface and address/hostname to use")
+ LOG.info("Trying to select the network interface and address to use " +
+ "by connecting to the configured JobManager")
+
val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
taskManagerHostname = try {
- NetUtils.resolveAddress(jobManagerAddress).getHostName()
+ // try to get the address for up to two minutes and start
+ // logging only after ten seconds
+ NetUtils.findConnectingAddress(jobManagerAddress, 120000, 10000).getHostName()
}
catch {
case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
new file mode 100644
index 0000000..3794fa0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the network utilities.
+ */
+public class NetUtilsTest {
+
+ @Test
+ public void testFindConnectableAddress() {
+ int unusedPort;
+ try {
+ unusedPort = NetUtils.getAvailablePort();
+ }
+ catch (Throwable t) {
+ // if this system cannot find an available port,
+ // skip this test
+ return;
+ }
+
+ try {
+ // create an unreachable target address
+ InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
+
+ final long start = System.currentTimeMillis();
+ InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
+
+ // check that it did not take forever
+ assertTrue(System.currentTimeMillis() - start < 8000);
+
+ // we should have found a heuristic address
+ assertNotNull(add);
+
+ // these checks are desirable, but will not work on every machine
+ // such as machines with no connected network media, which may
+ // default to a link local address
+ // assertFalse(add.isLinkLocalAddress());
+ // assertFalse(add.isLoopbackAddress());
+ // assertFalse(add.isAnyLocalAddress());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}