You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/02 19:43:54 UTC
[1/4] flink git commit: [FLINK-1626] [tests] Fix spurious failure in
MatchTaskTest
Repository: flink
Updated Branches:
refs/heads/master 086209cae -> e038629ee
[FLINK-1626] [tests] Fix spurious failure in MatchTaskTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e038629e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e038629e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e038629e
Branch: refs/heads/master
Commit: e038629eecf45b2f0445ba60603467dafc565aae
Parents: caa2941
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Mar 2 17:41:41 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100
----------------------------------------------------------------------
.../flink/runtime/operators/MatchTaskTest.java | 172 ++++++++++---------
1 file changed, 94 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e038629e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 553212a..16aea69 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -756,94 +756,110 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
@Test
public void testCancelHashMatchTaskWhileBuildFirst() {
- int keyCnt = 20;
- int valCnt = 20;
-
- addInput(new DelayingInfinitiveInputIterator(100));
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
-
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
- setOutput(new NirvanaOutputList());
-
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ addInput(new DelayingInfinitiveInputIterator(100));
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+ setOutput(new NirvanaOutputList());
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
}
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+ cancel();
+
+ try {
+ taskRunner.join();
}
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
+ catch (InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
}
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
}
@Test
public void testHashCancelMatchTaskWhileBuildSecond() {
- int keyCnt = 20;
- int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInput(new DelayingInfinitiveInputIterator(100));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(new NirvanaOutputList());
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ addInput(new DelayingInfinitiveInputIterator(100));
+
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+ setOutput(new NirvanaOutputList());
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
}
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+ cancel();
+
+ try {
+ taskRunner.join();
}
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
+ catch (InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
}
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
}
@Test
[4/4] flink git commit: [FLINK-1608] [taskmanager] Hostname/address
for TaskManager can be specified in the configuration
Posted by se...@apache.org.
[FLINK-1608] [taskmanager] Hostname/address for TaskManager can be specified in the configuration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/933609f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/933609f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/933609f0
Branch: refs/heads/master
Commit: 933609f0f6a183874feeccc35b5eae47cbf57f5d
Parents: 086209c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 24 22:04:09 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 5 ++++
.../flink/runtime/taskmanager/TaskManager.scala | 30 +++++++++++++-------
2 files changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/933609f0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 767648a..245bebe 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -81,6 +81,11 @@ public final class ConfigConstants {
public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
/**
+ * The config parameter defining the task manager's hostname.
+ */
+ public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname";
+
+ /**
* The config parameter defining the task manager's IPC port from the configuration.
*/
public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";
http://git-wip-us.apache.org/repos/asf/flink/blob/933609f0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7bfa370..7011eb4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -834,19 +834,27 @@ object TaskManager {
val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
- // try to find out the hostname of the interface from which the TaskManager
- // can connect to the JobManager. This involves a reverse name lookup
- LOG.info("Trying to determine network interface and address/hostname to use")
- val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
- val taskManagerHostname = try {
- NetUtils.resolveAddress(jobManagerAddress).getHostName()
- }
- catch {
- case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
- "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
+ var taskManagerHostname = configuration.getString(
+ ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
+
+ if (taskManagerHostname != null) {
+ LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
}
+ else {
+ // try to find out the hostname of the interface from which the TaskManager
+ // can connect to the JobManager. This involves a reverse name lookup
+ LOG.info("Trying to select the network interface and address/hostname to use")
+ val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
+ taskManagerHostname = try {
+ NetUtils.resolveAddress(jobManagerAddress).getHostName()
+ }
+ catch {
+ case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
+ "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
+ }
- LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
+ LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
+ }
// if no task manager port has been configured, use 0 (system will pick any free port)
val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
[2/4] flink git commit: [FLINK-1608] [taskmanager] Network address
selection makes multiple attempts before switching to heuristics.
Posted by se...@apache.org.
[FLINK-1608] [taskmanager] Network address selection makes multiple attempts before switching to heuristics.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/861ebe75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/861ebe75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/861ebe75
Branch: refs/heads/master
Commit: 861ebe753ff982b4cbf7c3c5b8c43fa306ac89f0
Parents: 933609f
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 25 13:51:03 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/runtime/net/NetUtils.java | 253 ++++++++++++++++---
.../flink/runtime/jobmanager/JobManager.scala | 8 +-
.../flink/runtime/taskmanager/TaskManager.scala | 10 +-
.../apache/flink/runtime/net/NetUtilsTest.java | 69 +++++
4 files changed, 303 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 8e0a41a..94073db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -31,73 +31,84 @@ import java.util.Enumeration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Utilities to determine the network interface and address that should be used to bind the
+ * TaskManager communication to.
+ */
public class NetUtils {
private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
+ private static final long MIN_SLEEP_TIME = 50;
+ private static final long MAX_SLEEP_TIME = 20000;
+
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
*/
private enum AddressDetectionState {
- ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix
- FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses.
- //this state uses a low timeout (say 50 ms) for fast detection.
- SLOW_CONNECT(1000), //same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
+ /** Detect own IP address based on the target IP address. Look for common prefix */
+ ADDRESS(50),
+ /** Try to connect on all Interfaces and all their addresses with a low timeout */
+ FAST_CONNECT(50),
+ /** Try to connect on all Interfaces and all their addresses with a long timeout */
+ SLOW_CONNECT(1000),
+ /** Choose any non-loopback address */
HEURISTIC(0);
-
private int timeout;
+
AddressDetectionState(int timeout) {
this.timeout = timeout;
}
+
public int getTimeout() {
return timeout;
}
}
/**
- * Find out the TaskManager's own IP address.
+ * Find out the TaskManager's own IP address, simple version.
*/
public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException {
AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC;
while (true) {
Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-
+
while (e.hasMoreElements()) {
NetworkInterface n = e.nextElement();
Enumeration<InetAddress> ee = n.getInetAddresses();
-
+
while (ee.hasMoreElements()) {
InetAddress i = ee.nextElement();
-
+
switch (strategy) {
case ADDRESS:
if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
- if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
+ if (tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true)) {
LOG.info("Determined {} as the machine's own IP address", i);
return i;
}
}
break;
-
+
case FAST_CONNECT:
case SLOW_CONNECT:
- boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
+ boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout(), true);
if (correct) {
LOG.info("Determined {} as the machine's own IP address", i);
return i;
}
break;
-
+
case HEURISTIC:
if (LOG.isDebugEnabled()) {
LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
" isLinkLocalAddress:" + i.isLinkLocalAddress() +
" isLoopbackAddress:" + i.isLoopbackAddress() + ".");
}
-
+
if (!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -105,7 +116,7 @@ public class NetUtils {
return i;
}
break;
-
+
default:
throw new RuntimeException("Unknown address detection strategy: " + strategy);
}
@@ -139,6 +150,171 @@ public class NetUtils {
}
/**
+ * Finds the local network address from which this machine can connect to the target
+ * address. This method tries to establish a proper network connection to the
+ * given target, so it only succeeds if the target socket address actually accepts
+ * connections. The method tries various strategies multiple times and uses an exponential
+ * backoff timer between tries.
+ * <p>
+ * If no connection attempt was successful after the given maximum time, the method
+ * will choose some address based on heuristics (excluding link-local and loopback addresses.)
+ * <p>
+ * This method will initially not log on info level (to not flood the log while the
+ * backoff time is still very low). It will start logging after a certain time
+ * has passes.
+ *
+ * @param targetAddress The address that the method tries to connect to.
+ * @param maxWaitMillis The maximum time that this method tries to connect, before falling
+ * back to the heuristics.
+ * @param startLoggingAfter The time after which the method will log on INFO level.
+ */
+ public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
+ long maxWaitMillis, long startLoggingAfter) throws IOException
+ {
+ if (targetAddress == null) {
+ throw new NullPointerException("targetAddress must not be null");
+ }
+ if (maxWaitMillis <= 0) {
+ throw new IllegalArgumentException("Max wait time must be positive");
+ }
+
+ final long startTime = System.currentTimeMillis();
+
+ long currentSleepTime = MIN_SLEEP_TIME;
+ long elapsedTime = 0;
+
+ // loop while there is time left
+ while (elapsedTime < maxWaitMillis) {
+ AddressDetectionState strategy = AddressDetectionState.ADDRESS;
+
+ boolean logging = elapsedTime >= startLoggingAfter;
+ if (logging) {
+ LOG.info("Trying to connect to " + targetAddress);
+ }
+ // go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
+ do {
+ InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
+ if (address != null) {
+ return address;
+ }
+
+ // pick the next strategy
+ switch (strategy) {
+ case ADDRESS:
+ strategy = AddressDetectionState.FAST_CONNECT;
+ break;
+ case FAST_CONNECT:
+ strategy = AddressDetectionState.SLOW_CONNECT;
+ break;
+ case SLOW_CONNECT:
+ strategy = null;
+ break;
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ }
+ while (strategy != null);
+
+ // we have made a pass with all strategies over all interfaces
+ // sleep for a while before we make the next pass
+ elapsedTime = System.currentTimeMillis() - startTime;
+
+ long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+ if (toWait > 0) {
+ if (logging) {
+ LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
+ } else {
+ LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
+ }
+
+ try {
+ Thread.sleep(toWait);
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Connection attempts have been interrupted.");
+ }
+ }
+
+ // increase the exponential backoff timer
+ currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+ }
+
+ // our attempts timed out. use the heuristic fallback
+ LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+ InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+ if (heuristic != null) {
+ return heuristic;
+ }
+ else {
+ LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+ return InetAddress.getLocalHost();
+ }
+ }
+
+
+ private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
+ InetSocketAddress targetAddress,
+ boolean logging) throws IOException
+ {
+ final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
+
+ // for each network interface
+ Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+ while (e.hasMoreElements()) {
+
+ NetworkInterface netInterface = e.nextElement();
+
+ // for each address of the network interface
+ Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress interfaceAddress = ee.nextElement();
+
+ switch (strategy) {
+ case ADDRESS:
+ if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
+ LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
+ targetAddress, interfaceAddress);
+
+ if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+ return interfaceAddress;
+ }
+ }
+ break;
+
+ case FAST_CONNECT:
+ case SLOW_CONNECT:
+ LOG.debug("Trying to connect to {} from local address {} with timeout {}",
+ targetAddress, interfaceAddress, strategy.getTimeout());
+
+ if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+ return interfaceAddress;
+ }
+ break;
+
+ case HEURISTIC:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
+ interfaceAddress, interfaceAddress.isLinkLocalAddress(),
+ interfaceAddress.isLoopbackAddress());
+ }
+ // pick a non-loopback non-link-local address
+ if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
+ !interfaceAddress.isLoopbackAddress())
+ {
+ return interfaceAddress;
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unsupported strategy: " + strategy);
+ }
+ } // end for each address of the interface
+ } // end for each interface
+
+ return null;
+ }
+
+ /**
* Checks if two addresses have a common prefix (first 2 bytes).
* Example: 192.168.???.???
* Works also with ipv6, but accepts probably too many addresses
@@ -147,35 +323,52 @@ public class NetUtils {
return address[0] == address2[0] && address[1] == address2[1];
}
- public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
+ /**
+ *
+ * @param fromAddress The address to connect from.
+ * @param toSocket The socket address to connect to.
+ * @param timeout The timeout fr the connection.
+ * @param logFailed Flag to indicate whether to log failed attempts on info level
+ * (failed attempts are always logged on DEBUG level).
+ * @return True, if the connection was successful, false otherwise.
+ * @throws IOException Thrown if the socket cleanup fails.
+ */
+ private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
+ int timeout, boolean logFailed) throws IOException
+ {
if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+ LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
+ " with timeout " + timeout);
}
- boolean connectable = true;
- Socket socket = null;
+ Socket socket = new Socket();
try {
- socket = new Socket();
- SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
+ // port 0 = let the OS choose the port
+ SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
// machine
socket.bind(bindP);
socket.connect(toSocket, timeout);
- } catch (Exception ex) {
- LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
+ return true;
+ }
+ catch (Exception ex) {
+ String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
if (LOG.isDebugEnabled()) {
- LOG.debug("Failed with exception", ex);
+ LOG.debug(message, ex);
+ } else if (logFailed) {
+ LOG.info(message);
}
- connectable = false;
+ return false;
}
finally {
- if (socket != null) {
- socket.close();
- }
+ socket.close();
}
- return connectable;
}
- public static final int getAvailablePort() {
+ /**
+ * Find a non-occupied port.
+ *
+ * @return A non-occupied port.
+ */
+ public static int getAvailablePort() {
for (int i = 0; i < 50; i++) {
ServerSocket serverSocket = null;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 415a20c..85ebdce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -756,8 +756,8 @@ object JobManager {
def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
head("Flink JobManager")
- opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify " +
- "the configuration directory.")
+ opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
+ "The configuration directory.")
opt[String]("executionMode") optional() action { (arg, c) =>
if(arg.equals("local")){
c.copy(executionMode = LOCAL)
@@ -765,7 +765,7 @@ object JobManager {
c.copy(executionMode = CLUSTER)
}
} text {
- "Specify the execution mode of the JobManager (CLUSTER / LOCAL)"
+ "The execution mode of the JobManager (CLUSTER / LOCAL)"
}
}
@@ -785,7 +785,7 @@ object JobManager {
(configuration, config.executionMode, hostname, port)
} getOrElse {
- throw new Exception("Invalid command line arguments. Usage: " + parser.usage)
+ throw new Exception("Invalid command line arguments: " + parser.usage)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7011eb4..182f8cf 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -102,7 +102,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
log.info("Starting task manager at {}.", self.path)
log.info("Creating {} task slot(s).", numberOfSlots)
- log.info("TaskManager connection information {}.", connectionInfo)
+ log.info("TaskManager connection information: {}", connectionInfo)
val HEARTBEAT_INTERVAL = 5000 millisecond
@@ -843,10 +843,14 @@ object TaskManager {
else {
// try to find out the hostname of the interface from which the TaskManager
// can connect to the JobManager. This involves a reverse name lookup
- LOG.info("Trying to select the network interface and address/hostname to use")
+ LOG.info("Trying to select the network interface and address to use " +
+ "by connecting to the configured JobManager")
+
val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
taskManagerHostname = try {
- NetUtils.resolveAddress(jobManagerAddress).getHostName()
+ // try to get the address for up to two minutes and start
+ // logging only after ten seconds
+ NetUtils.findConnectingAddress(jobManagerAddress, 120000, 10000).getHostName()
}
catch {
case t: Throwable => throw new Exception("TaskManager cannot find a network interface " +
http://git-wip-us.apache.org/repos/asf/flink/blob/861ebe75/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
new file mode 100644
index 0000000..3794fa0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the network utilities.
+ */
+public class NetUtilsTest {
+
+ @Test
+ public void testFindConnectableAddress() {
+ int unusedPort;
+ try {
+ unusedPort = NetUtils.getAvailablePort();
+ }
+ catch (Throwable t) {
+ // if this system cannot find an available port,
+ // skip this test
+ return;
+ }
+
+ try {
+ // create an unreachable target address
+ InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
+
+ final long start = System.currentTimeMillis();
+ InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
+
+ // check that it did not take forever
+ assertTrue(System.currentTimeMillis() - start < 8000);
+
+ // we should have found a heuristic address
+ assertNotNull(add);
+
+ // these checks are desirable, but will not work on every machine
+ // such as machines with no connected network media, which may
+ // default to a link local address
+ // assertFalse(add.isLinkLocalAddress());
+ // assertFalse(add.isLoopbackAddress());
+ // assertFalse(add.isAnyLocalAddress());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
[3/4] flink git commit: [runtime] Extend environment logging on
startup
Posted by se...@apache.org.
[runtime] Extend environment logging on startup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caa29417
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caa29417
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caa29417
Branch: refs/heads/master
Commit: caa29417d5bf6b234af06d7ccf31acb1d8e32fd9
Parents: 861ebe7
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 27 13:44:28 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Mar 2 18:36:47 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/WebFrontend.java | 2 +-
.../runtime/util/EnvironmentInformation.java | 20 +++++++++++++++-----
.../flink/runtime/jobmanager/JobManager.scala | 8 +++++---
.../flink/runtime/taskmanager/TaskManager.scala | 2 +-
.../util/EnvironmentInformationTest.java | 15 +++++++++++++++
.../yarn/appMaster/YarnTaskManagerRunner.java | 2 +-
6 files changed, 38 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
index 45f4391..9587ab2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/WebFrontend.java
@@ -43,7 +43,7 @@ public class WebFrontend {
*/
public static void main(String[] args) {
- EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client");
+ EnvironmentInformation.logEnvironmentInfo(LOG, "Web Client", args);
EnvironmentInformation.checkJavaVersion();
// check the arguments
http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 1fb6422..793e158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -208,7 +208,7 @@ public class EnvironmentInformation {
* @param log The logger to log the information to.
* @param componentName The component name to mention in the log.
*/
- public static void logEnvironmentInfo(Logger log, String componentName) {
+ public static void logEnvironmentInfo(Logger log, String componentName, String[] commandLineArgs) {
if (log.isInfoEnabled()) {
RevisionInformation rev = getRevisionInformation();
String version = getVersion();
@@ -227,19 +227,29 @@ public class EnvironmentInformation {
+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
log.info(" Current user: " + user);
log.info(" JVM: " + jvmVersion);
+ log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
+ log.info(" JAVA_HOME: " + (javaHome == null ? "(not set)" : javaHome));
if (options.length == 0) {
- log.info(" Startup Options: (none)");
+ log.info(" JVM Options: (none)");
}
else {
- log.info(" Startup Options:");
+ log.info(" JVM Options:");
for (String s: options) {
log.info(" " + s);
}
}
- log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
- log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
+ if (commandLineArgs == null || commandLineArgs.length == 0) {
+ log.info(" Program Arguments: (none)");
+ }
+ else {
+ log.info(" Program Arguments:");
+ for (String s: commandLineArgs) {
+ log.info(" " + s);
+ }
+ }
+
log.info("--------------------------------------------------------------------------------");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 85ebdce..7238e3d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -608,7 +608,7 @@ object JobManager {
def main(args: Array[String]): Unit = {
// startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
+ EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager", args)
EnvironmentInformation.checkJavaVersion()
// parsing the command line arguments
@@ -759,10 +759,12 @@ object JobManager {
opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
"The configuration directory.")
opt[String]("executionMode") optional() action { (arg, c) =>
- if(arg.equals("local")){
+ if (arg.equalsIgnoreCase("local")){
c.copy(executionMode = LOCAL)
- }else{
+ } else if (arg.equalsIgnoreCase("cluster")) {
c.copy(executionMode = CLUSTER)
+ } else {
+ throw new Exception("Unknown execution mode: " + arg)
}
} text {
"The execution mode of the JobManager (CLUSTER / LOCAL)"
http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 182f8cf..3d47258 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -743,7 +743,7 @@ object TaskManager {
*/
def main(args: Array[String]): Unit = {
// startup checks and logging
- EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager")
+ EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args)
EnvironmentInformation.checkJavaVersion()
// try to parse the command line arguments
http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index 64a676c..8da7b14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.util;
import static org.junit.Assert.*;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
public class EnvironmentInformationTest {
@@ -58,4 +60,17 @@ public class EnvironmentInformationTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testLogEnvironmentInformation() {
+ try {
+ Logger mockLogger = Mockito.mock(Logger.class);
+ EnvironmentInformation.logEnvironmentInfo(mockLogger, "test", new String[0]);
+ EnvironmentInformation.logEnvironmentInfo(mockLogger, "test", null);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/caa29417/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index bed8f19..214798c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -46,7 +46,7 @@ public class YarnTaskManagerRunner {
public static void main(final String[] args) throws IOException {
- EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager");
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
EnvironmentInformation.checkJavaVersion();
// try to parse the command line arguments