You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/17 15:39:21 UTC
[6/6] flink git commit: [FLINK-2967] Increase timeout for LOCAL_HOST
address detection strategy, give the local host address a higher priority
[FLINK-2967] Increase timeout for LOCAL_HOST address detection strategy, give the local host address a higher priority
This closes #1391
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a45212d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a45212d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a45212d2
Branch: refs/heads/master
Commit: a45212d2b18cc12e3c314ed43e8d19943693deed
Parents: ffb8aec
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Nov 16 21:26:57 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 17 15:38:06 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/net/ConnectionUtils.java | 58 +++++++++++++++++---
1 file changed, 51 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a45212d2/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 542e69e..d53e0fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -32,8 +32,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.duration.FiniteDuration;
/**
@@ -53,7 +55,7 @@ public class ConnectionUtils {
*/
private enum AddressDetectionState {
/** Connect from interface returned by InetAddress.getLocalHost() **/
- LOCAL_HOST(50),
+ LOCAL_HOST(200),
/** Detect own IP address based on the target IP address. Look for common prefix */
ADDRESS(50),
/** Try to connect on all Interfaces and all their addresses with a low timeout */
@@ -63,7 +65,7 @@ public class ConnectionUtils {
/** Choose any non-loopback address */
HEURISTIC(0);
- private int timeout;
+ private final int timeout;
AddressDetectionState(int timeout) {
this.timeout = timeout;
@@ -180,17 +182,59 @@ public class ConnectionUtils {
}
}
+ /**
+ * This utility method tries to connect to the JobManager using the InetAddress returned by
+ * InetAddress.getLocalHost(). The purpose of the utility is to have a final try connecting to
+ * the target address using the LocalHost before using the address returned.
+ * We do a second try because the JM might have been unavailable during the first check.
+ *
+ * @param preliminaryResult The address detected by the heuristic
+ * @return either the preliminaryResult or the address returned by InetAddress.getLocalHost() (if
+ * we are able to connect to targetAddress from there)
+ */
+ private static InetAddress tryLocalHostBeforeReturning(
+ InetAddress preliminaryResult, SocketAddress targetAddress, boolean logging) throws IOException {
+
+ InetAddress localhostName = InetAddress.getLocalHost();
+
+ if (preliminaryResult.equals(localhostName)) {
+ // preliminary result is equal to the local host name
+ return preliminaryResult;
+ }
+ else if (tryToConnect(localhostName, targetAddress, AddressDetectionState.SLOW_CONNECT.getTimeout(), logging)) {
+ // success, we were able to use local host to connect
+ LOG.debug("Preferring {} (InetAddress.getLocalHost()) for local bind point over previous candidate {}",
+ localhostName, preliminaryResult);
+ return localhostName;
+ }
+ else {
+ // we have to make the preliminary result the final result
+ return preliminaryResult;
+ }
+ }
+ /**
+ * Try to find a local address which allows as to connect to the targetAddress using the given
+ * strategy
+ *
+ * @param strategy Depending on the strategy, the method will enumerate all interfaces, trying to connect
+ * to the target address
+ * @param targetAddress The address we try to connect to
+ * @param logging Boolean indicating the logging verbosity
+ * @return null if we could not find an address using this strategy, otherwise, the local address.
+ * @throws IOException
+ */
private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
InetSocketAddress targetAddress,
boolean logging) throws IOException
{
// try LOCAL_HOST strategy independent of the network interfaces
- if(strategy == AddressDetectionState.LOCAL_HOST) {
+ if (strategy == AddressDetectionState.LOCAL_HOST) {
InetAddress localhostName = InetAddress.getLocalHost();
- if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
+ if (tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
+ // Here, we are not calling tryLocalHostBeforeReturning() because it is the LOCAL_HOST strategy
return localhostName;
} else {
return null;
@@ -217,7 +261,7 @@ public class ConnectionUtils {
targetAddress, interfaceAddress);
if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
- return interfaceAddress;
+ return tryLocalHostBeforeReturning(interfaceAddress, targetAddress, logging);
}
}
break;
@@ -228,7 +272,7 @@ public class ConnectionUtils {
targetAddress, interfaceAddress, strategy.getTimeout());
if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
- return interfaceAddress;
+ return tryLocalHostBeforeReturning(interfaceAddress, targetAddress, logging);
}
break;
@@ -242,7 +286,7 @@ public class ConnectionUtils {
if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
!interfaceAddress.isLoopbackAddress())
{
- return interfaceAddress;
+ return tryLocalHostBeforeReturning(interfaceAddress, targetAddress, logging);
}
break;