You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/02 19:43:54 UTC

[1/4] flink git commit: [FLINK-1626] [tests] Fix spurious failure in MatchTaskTest

Repository: flink
Updated Branches:
  refs/heads/master 086209cae -> e038629ee


[FLINK-1626] [tests] Fix spurious failure in MatchTaskTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e038629e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e038629e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e038629e

Branch: refs/heads/master
Commit: e038629eecf45b2f0445ba60603467dafc565aae
Parents: caa2941
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 2 17:41:41 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchTaskTest.java  | 172 ++++++++++---------
 1 file changed, 94 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e038629e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 553212a..16aea69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -756,94 +756,110 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 	
 	@Test
 	public void testCancelHashMatchTaskWhileBuildFirst() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		addInput(new DelayingInfinitiveInputIterator(100));
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		
-		setOutput(new NirvanaOutputList());
-		
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
+		final int keyCnt = 20;
+		final int valCnt = 20;
+
+		try {
+			addInput(new DelayingInfinitiveInputIterator(100));
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+			setOutput(new NirvanaOutputList());
+
+			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+			getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+			final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+
+			final AtomicBoolean success = new AtomicBoolean(false);
+
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+						success.set(true);
+					} catch (Exception ie) {
+						ie.printStackTrace();
+					}
 				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+			cancel();
+
+			try {
+				taskRunner.join();
 			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
+			catch (InterruptedException ie) {
+				Assert.fail("Joining threads failed");
+			}
+
+			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
 	}
 	
 	@Test
 	public void testHashCancelMatchTaskWhileBuildSecond() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInput(new DelayingInfinitiveInputIterator(100));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
+		final int keyCnt = 20;
+		final int valCnt = 20;
+
+		try {
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+			addInput(new DelayingInfinitiveInputIterator(100));
+
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+			setOutput(new NirvanaOutputList());
+
+			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+			getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+			final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+
+			final AtomicBoolean success = new AtomicBoolean(false);
+
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+						success.set(true);
+					} catch (Exception ie) {
+						ie.printStackTrace();
+					}
 				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+			cancel();
+
+			try {
+				taskRunner.join();
 			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
+			catch (InterruptedException ie) {
+				Assert.fail("Joining threads failed");
+			}
+
+			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
 	}
 	
 	@Test


[4/4] flink git commit: [FLINK-1608] [taskmanager] Hostname/address for TaskManager can be specified in the configuration

Posted by se...@apache.org.
[FLINK-1608] [taskmanager] Hostname/address for TaskManager can be specified in the configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/933609f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/933609f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/933609f0

Branch: refs/heads/master
Commit: 933609f0f6a183874feeccc35b5eae47cbf57f5d
Parents: 086209c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 24 22:04:09 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  5 ++++
 .../flink/runtime/taskmanager/TaskManager.scala | 30 +++++++++++++-------
 2 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/933609f0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 767648a..245bebe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -81,6 +81,11 @@ public final class ConfigConstants {
 	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
 
 	/**
+	 * The config parameter defining the task manager's hostname.
+	 */
+	public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname";
+
+	/**
 	 * The config parameter defining the task manager's IPC port from the configuration.
 	 */
 	public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";

http://git-wip-us.apache.org/repos/asf/flink/blob/933609f0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7bfa370..7011eb4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -834,19 +834,27 @@ object TaskManager {
 
     val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
 
-    // try to find out the hostname of the interface from which the TaskManager
-    // can connect to the JobManager. This involves a reverse name lookup
-    LOG.info("Trying to determine network interface and address/hostname to use")
-    val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
-    val taskManagerHostname = try {
-      NetUtils.resolveAddress(jobManagerAddress).getHostName()
-    }
-    catch {
-      case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
-        "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
+    var taskManagerHostname = configuration.getString(
+                                       ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
+
+    if (taskManagerHostname != null) {
+      LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
     }
+    else {
+      // try to find out the hostname of the interface from which the TaskManager
+      // can connect to the JobManager. This involves a reverse name lookup
+      LOG.info("Trying to select the network interface and address/hostname to use")
+      val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
+      taskManagerHostname = try {
+        NetUtils.resolveAddress(jobManagerAddress).getHostName()
+      }
+      catch {
+        case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
+          "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
+      }
 
-    LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
+      LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
+    }
 
     // if no task manager port has been configured, use 0 (system will pick any free port)
     val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)


[2/4] flink git commit: [FLINK-1608] [taskmanager] Network address selection makes multiple attempts before switching to heuristics.

Posted by se...@apache.org.
[FLINK-1608] [taskmanager] Network address selection makes multiple attempts before switching to heuristics.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/861ebe75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/861ebe75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/861ebe75

Branch: refs/heads/master
Commit: 861ebe753ff982b4cbf7c3c5b8c43fa306ac89f0
Parents: 933609f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 25 13:51:03 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/net/NetUtils.java  | 253 ++++++++++++++++---
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  10 +-
 .../apache/flink/runtime/net/NetUtilsTest.java  |  69 +++++
 4 files changed, 303 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 8e0a41a..94073db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -31,73 +31,84 @@ import java.util.Enumeration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Utilities to determine the network interface and address that should be used to bind the
+ * TaskManager communication to.
+ */
 public class NetUtils {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
 
+	private static final long MIN_SLEEP_TIME = 50;
+	private static final long MAX_SLEEP_TIME = 20000;
+
 	/**
 	 * The states of address detection mechanism.
 	 * There is only a state transition if the current state failed to determine the address.
 	 */
 	private enum AddressDetectionState {
-		ADDRESS(50), 		//detect own IP based on the JobManagers IP address. Look for common prefix
-		FAST_CONNECT(50),	//try to connect to the JobManager on all Interfaces and all their addresses.
-		//this state uses a low timeout (say 50 ms) for fast detection.
-		SLOW_CONNECT(1000),	//same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
+		/** Detect own IP address based on the target IP address. Look for common prefix */
+		ADDRESS(50),
+		/** Try to connect on all Interfaces and all their addresses with a low timeout */
+		FAST_CONNECT(50),
+		/** Try to connect on all Interfaces and all their addresses with a long timeout */
+		SLOW_CONNECT(1000),
+		/** Choose any non-loopback address */
 		HEURISTIC(0);
 
-
 		private int timeout;
+
 		AddressDetectionState(int timeout) {
 			this.timeout = timeout;
 		}
+
 		public int getTimeout() {
 			return timeout;
 		}
 	}
 
 	/**
-	 * Find out the TaskManager's own IP address.
+	 * Find out the TaskManager's own IP address, simple version.
 	 */
 	public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException {
 		AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC;
 
 		while (true) {
 			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-			
+
 			while (e.hasMoreElements()) {
 				NetworkInterface n = e.nextElement();
 				Enumeration<InetAddress> ee = n.getInetAddresses();
-				
+
 				while (ee.hasMoreElements()) {
 					InetAddress i = ee.nextElement();
-					
+
 					switch (strategy) {
 						case ADDRESS:
 							if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
-								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
+								if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) {
 									LOG.info("Determined {} as the machine's own IP address", i);
 									return i;
 								}
 							}
 							break;
-							
+
 						case FAST_CONNECT:
 						case SLOW_CONNECT:
-							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
+							boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true);
 							if (correct) {
 								LOG.info("Determined {} as the machine's own IP address", i);
 								return i;
 							}
 							break;
-							
+
 						case HEURISTIC:
 							if (LOG.isDebugEnabled()) {
 								LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
 										" isLinkLocalAddress:" + i.isLinkLocalAddress() +
 										" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
 							}
-							
+
 							if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
 								LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
 										"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -105,7 +116,7 @@ public class NetUtils {
 								return i;
 							}
 							break;
-							
+
 						default:
 							throw new RuntimeException("Unknown address detection strategy: " + strategy);
 					}
@@ -139,6 +150,171 @@ public class NetUtils {
 	}
 
 	/**
+	 * Finds the local network address from which this machine can connect to the target
+	 * address. This method tries to establish a proper network connection to the
+	 * given target, so it only succeeds if the target socket address actually accepts
+	 * connections. The method tries various strategies multiple times and uses an exponential
+	 * backoff timer between tries.
+	 * <p>
+	 * If no connection attempt was successful after the given maximum time, the method
+	 * will choose some address based on heuristics (excluding link-local and loopback addresses.)
+	 * <p>
+	 * This method will initially not log on info level (to not flood the log while the
+	 * backoff time is still very low). It will start logging after a certain time
+	 * has passes.
+	 *
+	 * @param targetAddress The address that the method tries to connect to.
+	 * @param maxWaitMillis The maximum time that this method tries to connect, before falling
+	 *                       back to the heuristics.
+	 * @param startLoggingAfter The time after which the method will log on INFO level.
+	 */
+	public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
+							long maxWaitMillis, long startLoggingAfter) throws IOException
+	{
+		if (targetAddress == null) {
+			throw new NullPointerException("targetAddress must not be null");
+		}
+		if (maxWaitMillis <= 0) {
+			throw new IllegalArgumentException("Max wait time must be positive");
+		}
+
+		final long startTime = System.currentTimeMillis();
+
+		long currentSleepTime = MIN_SLEEP_TIME;
+		long elapsedTime = 0;
+
+		// loop while there is time left
+		while (elapsedTime < maxWaitMillis) {
+			AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+
+			boolean logging = elapsedTime >= startLoggingAfter;
+			if (logging) {
+				LOG.info("Trying to connect to " + targetAddress);
+			}
+			// go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
+			do {
+				InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
+				if (address != null) {
+					return address;
+				}
+
+				// pick the next strategy
+				switch (strategy) {
+					case ADDRESS:
+						strategy = AddressDetectionState.FAST_CONNECT;
+						break;
+					case FAST_CONNECT:
+						strategy = AddressDetectionState.SLOW_CONNECT;
+						break;
+					case SLOW_CONNECT:
+						strategy = null;
+						break;
+					default:
+						throw new RuntimeException("Unsupported strategy: " + strategy);
+				}
+			}
+			while (strategy != null);
+
+			// we have made a pass with all strategies over all interfaces
+			// sleep for a while before we make the next pass
+			elapsedTime = System.currentTimeMillis() - startTime;
+
+			long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+			if (toWait > 0) {
+				if (logging) {
+					LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
+				} else {
+					LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
+				}
+
+				try {
+					Thread.sleep(toWait);
+				}
+				catch (InterruptedException e) {
+					throw new IOException("Connection attempts have been interrupted.");
+				}
+			}
+
+			// increase the exponential backoff timer
+			currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+		}
+
+		// our attempts timed out. use the heuristic fallback
+		LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+		InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+		if (heuristic != null) {
+			return heuristic;
+		}
+		else {
+			LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+			return InetAddress.getLocalHost();
+		}
+	}
+
+
+	private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
+														InetSocketAddress targetAddress,
+														boolean logging) throws IOException
+	{
+		final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
+
+		// for each network interface
+		Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+		while (e.hasMoreElements()) {
+
+			NetworkInterface netInterface = e.nextElement();
+
+			// for each address of the network interface
+			Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+			while (ee.hasMoreElements()) {
+				InetAddress interfaceAddress = ee.nextElement();
+
+				switch (strategy) {
+					case ADDRESS:
+						if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
+							LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
+										targetAddress, interfaceAddress);
+
+							if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+								return interfaceAddress;
+							}
+						}
+						break;
+
+					case FAST_CONNECT:
+					case SLOW_CONNECT:
+						LOG.debug("Trying to connect to {} from local address {} with timeout {}",
+								targetAddress, interfaceAddress, strategy.getTimeout());
+
+						if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+							return interfaceAddress;
+						}
+						break;
+
+					case HEURISTIC:
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
+									interfaceAddress, interfaceAddress.isLinkLocalAddress(),
+									interfaceAddress.isLoopbackAddress());
+						}
+						// pick a non-loopback non-link-local address
+						if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
+								!interfaceAddress.isLoopbackAddress())
+						{
+							return interfaceAddress;
+						}
+						break;
+
+					default:
+						throw new RuntimeException("Unsupported strategy: " + strategy);
+				}
+			} // end for each address of the interface
+		} // end for each interface
+
+		return null;
+	}
+
+	/**
 	 * Checks if two addresses have a common prefix (first 2 bytes).
 	 * Example: 192.168.???.???
 	 * Works also with ipv6, but accepts probably too many addresses
@@ -147,35 +323,52 @@ public class NetUtils {
 		return address[0] == address2[0] && address[1] == address2[1];
 	}
 
-	public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
+	/**
+	 *
+	 * @param fromAddress The address to connect from.
+	 * @param toSocket The socket address to connect to.
+	 * @param timeout The timeout fr the connection.
+	 * @param logFailed Flag to indicate whether to log failed attempts on info level
+	 *                  (failed attempts are always logged on DEBUG level).
+	 * @return True, if the connection was successful, false otherwise.
+	 * @throws IOException Thrown if the socket cleanup fails.
+	 */
+	private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
+										int timeout, boolean logFailed) throws IOException
+	{
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+			LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
 					+ " with timeout " + timeout);
 		}
-		boolean connectable = true;
-		Socket socket = null;
+		Socket socket = new Socket();
 		try {
-			socket = new Socket();
-			SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
+			// port 0 = let the OS choose the port
+			SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
 			// machine
 			socket.bind(bindP);
 			socket.connect(toSocket, timeout);
-		} catch (Exception ex) {
-			LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
+			return true;
+		}
+		catch (Exception ex) {
+			String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Failed with exception", ex);
+				LOG.debug(message, ex);
+			} else if (logFailed) {
+				LOG.info(message);
 			}
-			connectable = false;
+			return false;
 		}
 		finally {
-			if (socket != null) {
-				socket.close();
-			}
+			socket.close();
 		}
-		return connectable;
 	}
 
-	public static final int getAvailablePort() {
+	/**
+	 * Find a non-occupied port.
+	 *
+	 * @return A non-occupied port.
+	 */
+	public static int getAvailablePort() {
 		for (int i = 0; i < 50; i++) {
 			ServerSocket serverSocket = null;
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 415a20c..85ebdce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -756,8 +756,8 @@ object JobManager {
   def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
       head("Flink JobManager")
-      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify " +
-        "the configuration directory.")
+      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
+        "The configuration directory.")
       opt[String]("executionMode") optional() action { (arg, c) =>
         if(arg.equals("local")){
           c.copy(executionMode = LOCAL)
@@ -765,7 +765,7 @@ object JobManager {
           c.copy(executionMode = CLUSTER)
         }
       } text {
-        "Specify the execution mode of the JobManager (CLUSTER / LOCAL)"
+        "The execution mode of the JobManager (CLUSTER / LOCAL)"
       }
     }
 
@@ -785,7 +785,7 @@ object JobManager {
 
         (configuration, config.executionMode, hostname, port)
     } getOrElse {
-      throw new Exception("Invalid command line arguments. Usage: " + parser.usage)
+      throw new Exception("Invalid command line arguments: " + parser.usage)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7011eb4..182f8cf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -102,7 +102,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
   log.info("Starting task manager at {}.", self.path)
   log.info("Creating {} task slot(s).", numberOfSlots)
-  log.info("TaskManager connection information {}.", connectionInfo)
+  log.info("TaskManager connection information: {}", connectionInfo)
 
   val HEARTBEAT_INTERVAL = 5000 millisecond
 
@@ -843,10 +843,14 @@ object TaskManager {
     else {
       // try to find out the hostname of the interface from which the TaskManager
       // can connect to the JobManager. This involves a reverse name lookup
-      LOG.info("Trying to select the network interface and address/hostname to use")
+      LOG.info("Trying to select the network interface and address to use " +
+        "by connecting to the configured JobManager")
+
       val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
       taskManagerHostname = try {
-        NetUtils.resolveAddress(jobManagerAddress).getHostName()
+        // try to get the address for up to two minutes and start
+        // logging only after ten seconds
+        NetUtils.findConnectingAddress(jobManagerAddress, 120000, 10000).getHostName()
       }
       catch {
         case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +

http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
new file mode 100644
index 0000000..3794fa0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the network utilities.
+ */
+public class NetUtilsTest {
+
+	@Test
+	public void testFindConnectableAddress() {
+		int unusedPort;
+		try {
+			unusedPort = NetUtils.getAvailablePort();
+		}
+		catch (Throwable t) {
+			// if this system cannot find an available port,
+			// skip this test
+			return;
+		}
+
+		try {
+			// create an unreachable target address
+			InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
+
+			final long start = System.currentTimeMillis();
+			InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
+
+			// check that it did not take forever
+			assertTrue(System.currentTimeMillis() - start < 8000);
+
+			// we should have found a heuristic address
+			assertNotNull(add);
+
+			// these checks are desirable, but will not work on every machine
+			// such as machines with no connected network media, which may
+			// default to a link local address
+			// assertFalse(add.isLinkLocalAddress());
+			// assertFalse(add.isLoopbackAddress());
+			// assertFalse(add.isAnyLocalAddress());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


[3/4] flink git commit: [runtime] Extend environment logging on startup

Posted by se...@apache.org.
[runtime] Extend environment logging on startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caa29417
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caa29417
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caa29417

Branch: refs/heads/master
Commit: caa29417d5bf6b234af06d7ccf31acb1d8e32fd9
Parents: 861ebe7
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 27 13:44:28 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/WebFrontend.java    |  2 +-
 .../runtime/util/EnvironmentInformation.java    | 20 +++++++++++++++-----
 .../flink/runtime/jobmanager/JobManager.scala   |  8 +++++---
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../util/EnvironmentInformationTest.java        | 15 +++++++++++++++
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  2 +-
 6 files changed, 38 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
index 45f4391..9587ab2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
@@ -43,7 +43,7 @@ public class WebFrontend {
 	 */
 	public static void main(String[] args) {
 
-		EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client");
+		EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client", args);
 		EnvironmentInformation.checkJavaVersion();
 
 		// check the arguments

http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 1fb6422..793e158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -208,7 +208,7 @@ public class EnvironmentInformation {
 	 * @param log The logger to log the information to.
 	 * @param componentName The component name to mention in the log.
 	 */
-	public static void logEnvironmentInfo(Logger log, String componentName) {
+	public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
 		if (log.isInfoEnabled()) {
 			RevisionInformation rev = getRevisionInformation();
 			String version = getVersion();
@@ -227,19 +227,29 @@ public class EnvironmentInformation {
 					+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
 			log.info(" Current user: " + user);
 			log.info(" JVM: " + jvmVersion);
+			log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
+			log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
 
 			if (options.length == 0) {
-				log.info(" Startup Options: (none)");
+				log.info(" JVM Options: (none)");
 			}
 			else {
-				log.info(" Startup Options:");
+				log.info(" JVM Options:");
 				for (String s: options) {
 					log.info("    " + s);
 				}
 			}
 
-			log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
-			log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
+			if (commandLineArgs == null || commandLineArgs.length == 0) {
+				log.info(" Program Arguments: (none)");
+			}
+			else {
+				log.info(" Program Arguments:");
+				for (String s: commandLineArgs) {
+					log.info("    " + s);
+				}
+			}
+
 			log.info("--------------------------------------------------------------------------------");
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 85ebdce..7238e3d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -608,7 +608,7 @@ object JobManager {
   def main(args: Array[String]): Unit = {
 
     // startup checks and logging
-    EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
+    EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager", args)
     EnvironmentInformation.checkJavaVersion()
 
     // parsing the command line arguments
@@ -759,10 +759,12 @@ object JobManager {
       opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
         "The configuration directory.")
       opt[String]("executionMode") optional() action { (arg, c) =>
-        if(arg.equals("local")){
+        if (arg.equalsIgnoreCase("local")){
           c.copy(executionMode = LOCAL)
-        }else{
+        } else if (arg.equalsIgnoreCase("cluster")) {
           c.copy(executionMode = CLUSTER)
+        } else {
+          throw new Exception("Unknown execution mode: " + arg)
         }
       } text {
         "The execution mode of the JobManager (CLUSTER / LOCAL)"

http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 182f8cf..3d47258 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -743,7 +743,7 @@ object TaskManager {
    */
   def main(args: Array[String]): Unit = {
     // startup checks and logging
-    EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager")
+    EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args)
     EnvironmentInformation.checkJavaVersion()
 
     // try to parse the command line arguments

http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index 64a676c..8da7b14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.util;
 import static org.junit.Assert.*;
 
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
 
 public class EnvironmentInformationTest {
 
@@ -58,4 +60,17 @@ public class EnvironmentInformationTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testLogEnvironmentInformation() {
+		try {
+			Logger mockLogger = Mockito.mock(Logger.class);
+			EnvironmentInformation.logEnvironmentInfo(mockLogger, "test", new String[0]);
+			EnvironmentInformation.logEnvironmentInfo(mockLogger, "test", null);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index bed8f19..214798c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -46,7 +46,7 @@ public class YarnTaskManagerRunner {
 
 	public static void main(final String[] args) throws IOException {
 
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager");
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
 		EnvironmentInformation.checkJavaVersion();
 
 		// try to parse the command line arguments