You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 09:16:54 UTC

[1/3] flink git commit: [FLINK-2781] [core] Cleanup NetUtils.

Repository: flink
Updated Branches:
  refs/heads/master 28d36cc92 -> c04a77042


[FLINK-2781] [core] Cleanup NetUtils.

  - The NetUtils class (in flink-core) contains all methods usable without runtime dependency
  - The runtime NetUtils class (to find connectiong addresses) is now called ConnectionUtil.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3c0b446
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3c0b446
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3c0b446

Branch: refs/heads/master
Commit: a3c0b446383262a6643b0c0888d14b52ac732835
Parents: 28d36cc
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 29 16:36:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:55 2015 +0200

----------------------------------------------------------------------
 .../client/program/ClientConnectionTest.java    |   2 +-
 .../apache/flink/client/program/ClientTest.java |   6 +-
 .../contrib/streaming/DataStreamUtils.java      |   4 +-
 .../java/org/apache/flink/util/NetUtils.java    |  21 +
 .../flink/runtime/net/ConnectionUtils.java      | 456 +++++++++++++++++
 .../org/apache/flink/runtime/net/NetUtils.java  | 495 -------------------
 .../runtime/util/LeaderRetrievalUtils.java      |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../io/network/NetworkEnvironmentTest.java      |   4 +-
 .../runtime/io/network/netty/NettyTestUtil.java |  14 +-
 .../jobmanager/JobManagerStartupTest.java       |   2 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   4 +-
 .../flink/runtime/net/ConnectionUtilsTest.java  |  70 +++
 .../apache/flink/runtime/net/NetUtilsTest.java  |  70 ---
 .../TaskManagerProcessReapingTest.java          |   4 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   4 +-
 .../jobmanager/JobManagerConnectionTest.scala   |   2 +-
 .../runtime/testingUtils/TestingCluster.scala   |   3 +-
 .../connectors/kafka/KafkaTestBase.java         |   2 +-
 .../streaming/util/SocketOutputTestBase.java    |   3 +-
 .../streaming/util/SocketProgramITCaseBase.java |   3 +-
 .../AbstractProcessFailureRecoveryTest.java     |   6 +-
 .../recovery/ProcessFailureCancelingITCase.java |   2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   4 +-
 24 files changed, 585 insertions(+), 602 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index ef2161e..8be6b03 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 
 import java.net.InetAddress;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 621ef63..a620139 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
@@ -44,20 +43,17 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.NetUtils;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
-
-
 import java.util.UUID;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
index c965aa8..6f4e8d9 100644
--- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.net.ConnectionUtils;
 
 public final class DataStreamUtils {
 
@@ -46,7 +46,7 @@ public final class DataStreamUtils {
 			String host = ((RemoteStreamEnvironment)env).getHost();
 			int port = ((RemoteStreamEnvironment)env).getPort();
 			try {
-				clientAddress = NetUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
+				clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
 			} catch (IOException e) {
 				throw new RuntimeException("IOException while trying to connect to the master", e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index abbcf3c..03721c5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -18,7 +18,9 @@
 package org.apache.flink.util;
 
 
+import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.ServerSocket;
 import java.net.URL;
 
 public class NetUtils {
@@ -65,4 +67,23 @@ public class NetUtils {
 			throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);
 		}
 	}
+
+	/**
+	 * Find a non-occupied port.
+	 *
+	 * @return A non-occupied port.
+	 */
+	public static int getAvailablePort() {
+		for (int i = 0; i < 50; i++) {
+			try (ServerSocket serverSocket = new ServerSocket(0)) {
+				int port = serverSocket.getLocalPort();
+				if (port != 0) {
+					return port;
+				}
+			}
+			catch (IOException ignored) {}
+		}
+
+		throw new RuntimeException("Could not find a free permitted port on the machine.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
new file mode 100644
index 0000000..0ed9345
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.net;
+
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Enumeration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utilities to determine the network interface and address that should be used to bind the
+ * TaskManager communication to.
+ */
+public class ConnectionUtils {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);
+
+	private static final long MIN_SLEEP_TIME = 50;
+	private static final long MAX_SLEEP_TIME = 20000;
+
+	/**
+	 * The states of address detection mechanism.
+	 * There is only a state transition if the current state failed to determine the address.
+	 */
+	private enum AddressDetectionState {
+		/** Connect from interface returned by InetAddress.getLocalHost() **/
+		LOCAL_HOST(50),
+		/** Detect own IP address based on the target IP address. Look for common prefix */
+		ADDRESS(50),
+		/** Try to connect on all Interfaces and all their addresses with a low timeout */
+		FAST_CONNECT(50),
+		/** Try to connect on all Interfaces and all their addresses with a long timeout */
+		SLOW_CONNECT(1000),
+		/** Choose any non-loopback address */
+		HEURISTIC(0);
+
+		private int timeout;
+
+		AddressDetectionState(int timeout) {
+			this.timeout = timeout;
+		}
+
+		public int getTimeout() {
+			return timeout;
+		}
+	}
+
+
+	/**
+	 * Finds the local network address from which this machine can connect to the target
+	 * address. This method tries to establish a proper network connection to the
+	 * given target, so it only succeeds if the target socket address actually accepts
+	 * connections. The method tries various strategies multiple times and uses an exponential
+	 * backoff timer between tries.
+	 * <p>
+	 * If no connection attempt was successful after the given maximum time, the method
+	 * will choose some address based on heuristics (excluding link-local and loopback addresses.)
+	 * <p>
+	 * This method will initially not log on info level (to not flood the log while the
+	 * backoff time is still very low). It will start logging after a certain time
+	 * has passes.
+	 *
+	 * @param targetAddress The address that the method tries to connect to.
+	 * @param maxWaitMillis The maximum time that this method tries to connect, before falling
+	 *                       back to the heuristics.
+	 * @param startLoggingAfter The time after which the method will log on INFO level.
+	 */
+	public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
+							long maxWaitMillis, long startLoggingAfter) throws IOException
+	{
+		if (targetAddress == null) {
+			throw new NullPointerException("targetAddress must not be null");
+		}
+		if (maxWaitMillis <= 0) {
+			throw new IllegalArgumentException("Max wait time must be positive");
+		}
+
+		final long startTime = System.currentTimeMillis();
+
+		long currentSleepTime = MIN_SLEEP_TIME;
+		long elapsedTime = 0;
+
+		// loop while there is time left
+		while (elapsedTime < maxWaitMillis) {
+			AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
+
+			boolean logging = elapsedTime >= startLoggingAfter;
+			if (logging) {
+				LOG.info("Trying to connect to " + targetAddress);
+			}
+			// go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
+			do {
+				InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
+				if (address != null) {
+					return address;
+				}
+
+				// pick the next strategy
+				switch (strategy) {
+					case LOCAL_HOST:
+						strategy = AddressDetectionState.ADDRESS;
+						break;
+					case ADDRESS:
+						strategy = AddressDetectionState.FAST_CONNECT;
+						break;
+					case FAST_CONNECT:
+						strategy = AddressDetectionState.SLOW_CONNECT;
+						break;
+					case SLOW_CONNECT:
+						strategy = null;
+						break;
+					default:
+						throw new RuntimeException("Unsupported strategy: " + strategy);
+				}
+			}
+			while (strategy != null);
+
+			// we have made a pass with all strategies over all interfaces
+			// sleep for a while before we make the next pass
+			elapsedTime = System.currentTimeMillis() - startTime;
+
+			long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+			if (toWait > 0) {
+				if (logging) {
+					LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
+				} else {
+					LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
+				}
+
+				try {
+					Thread.sleep(toWait);
+				}
+				catch (InterruptedException e) {
+					throw new IOException("Connection attempts have been interrupted.");
+				}
+			}
+
+			// increase the exponential backoff timer
+			currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+		}
+
+		// our attempts timed out. use the heuristic fallback
+		LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+		InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+		if (heuristic != null) {
+			return heuristic;
+		}
+		else {
+			LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+			return InetAddress.getLocalHost();
+		}
+	}
+
+
+	private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
+														InetSocketAddress targetAddress,
+														boolean logging) throws IOException
+	{
+		// try LOCAL_HOST strategy independent of the network interfaces
+		if(strategy == AddressDetectionState.LOCAL_HOST) {
+			InetAddress localhostName = InetAddress.getLocalHost();
+
+			if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
+				LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
+				return localhostName;
+			} else {
+				return null;
+			}
+		}
+
+		final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
+
+		// for each network interface
+		Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+		while (e.hasMoreElements()) {
+
+			NetworkInterface netInterface = e.nextElement();
+
+			// for each address of the network interface
+			Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+			while (ee.hasMoreElements()) {
+				InetAddress interfaceAddress = ee.nextElement();
+
+				switch (strategy) {
+					case ADDRESS:
+						if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
+							LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
+										targetAddress, interfaceAddress);
+
+							if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+								return interfaceAddress;
+							}
+						}
+						break;
+
+					case FAST_CONNECT:
+					case SLOW_CONNECT:
+						LOG.debug("Trying to connect to {} from local address {} with timeout {}",
+								targetAddress, interfaceAddress, strategy.getTimeout());
+
+						if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
+							return interfaceAddress;
+						}
+						break;
+
+					case HEURISTIC:
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
+									interfaceAddress, interfaceAddress.isLinkLocalAddress(),
+									interfaceAddress.isLoopbackAddress());
+						}
+						// pick a non-loopback non-link-local address
+						if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
+								!interfaceAddress.isLoopbackAddress())
+						{
+							return interfaceAddress;
+						}
+						break;
+
+					default:
+						throw new RuntimeException("Unsupported strategy: " + strategy);
+				}
+			} // end for each address of the interface
+		} // end for each interface
+
+		return null;
+	}
+
+	/**
+	 * Checks if two addresses have a common prefix (first 2 bytes).
+	 * Example: 192.168.???.???
+	 * Works also with ipv6, but accepts probably too many addresses
+	 */
+	private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
+		return address[0] == address2[0] && address[1] == address2[1];
+	}
+
+	/**
+	 *
+	 * @param fromAddress The address to connect from.
+	 * @param toSocket The socket address to connect to.
+	 * @param timeout The timeout fr the connection.
+	 * @param logFailed Flag to indicate whether to log failed attempts on info level
+	 *                  (failed attempts are always logged on DEBUG level).
+	 * @return True, if the connection was successful, false otherwise.
+	 * @throws IOException Thrown if the socket cleanup fails.
+	 */
+	private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
+										int timeout, boolean logFailed) throws IOException
+	{
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
+					+ " with timeout " + timeout);
+		}
+		try (Socket socket = new Socket()) {
+			// port 0 = let the OS choose the port
+			SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
+			// machine
+			socket.bind(bindP);
+			socket.connect(toSocket, timeout);
+			return true;
+		}
+		catch (Exception ex) {
+			String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(message, ex);
+			} else if (logFailed) {
+				LOG.info(message);
+			}
+			return false;
+		}
+	}
+
+	public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
+
+		private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
+
+		private enum LeaderRetrievalState {
+			NOT_RETRIEVED,
+			RETRIEVED,
+			NEWLY_RETRIEVED
+		}
+
+		final private Object retrievalLock = new Object();
+
+		private String akkaURL;
+		private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
+		private Exception exception;
+
+		public InetAddress findConnectingAddress(
+				FiniteDuration timeout) throws LeaderRetrievalException {
+			return findConnectingAddress(timeout, defaultLoggingDelay);
+		}
+
+		public InetAddress findConnectingAddress(
+				FiniteDuration timeout,
+				FiniteDuration startLoggingAfter)
+			throws LeaderRetrievalException {
+			long startTime = System.currentTimeMillis();
+			long currentSleepTime = MIN_SLEEP_TIME;
+			long elapsedTime = 0;
+			InetSocketAddress targetAddress = null;
+
+			try {
+				while (elapsedTime < timeout.toMillis()) {
+
+					long maxTimeout = timeout.toMillis() - elapsedTime;
+
+					synchronized (retrievalLock) {
+						if (exception != null) {
+							throw exception;
+						}
+
+						if (retrievalState == LeaderRetrievalState.NOT_RETRIEVED) {
+							try {
+								retrievalLock.wait(maxTimeout);
+							} catch (InterruptedException e) {
+								throw new Exception("Finding connecting address was interrupted" +
+										"while waiting for the leader retrieval.");
+							}
+						} else if (retrievalState == LeaderRetrievalState.NEWLY_RETRIEVED) {
+							targetAddress = AkkaUtils.getInetSockeAddressFromAkkaURL(akkaURL);
+
+							LOG.info("Retrieved new target address {}.", targetAddress);
+
+							retrievalState = LeaderRetrievalState.RETRIEVED;
+
+							currentSleepTime = MIN_SLEEP_TIME;
+						} else {
+							currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
+						}
+					}
+
+					if (targetAddress != null) {
+						AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
+
+						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
+						if (logging) {
+							LOG.info("Trying to connect to address {}." + targetAddress);
+						}
+
+						do {
+							InetAddress address = ConnectionUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
+							if (address != null) {
+								return address;
+							}
+
+							// pick the next strategy
+							switch (strategy) {
+								case LOCAL_HOST:
+									strategy = AddressDetectionState.ADDRESS;
+									break;
+								case ADDRESS:
+									strategy = AddressDetectionState.FAST_CONNECT;
+									break;
+								case FAST_CONNECT:
+									strategy = AddressDetectionState.SLOW_CONNECT;
+									break;
+								case SLOW_CONNECT:
+									strategy = null;
+									break;
+								default:
+									throw new RuntimeException("Unsupported strategy: " + strategy);
+							}
+						}
+						while (strategy != null);
+					}
+
+					elapsedTime = System.currentTimeMillis() - startTime;
+
+					long timeToWait = Math.min(
+							Math.max(timeout.toMillis() - elapsedTime, 0),
+							currentSleepTime);
+
+					if (timeToWait > 0) {
+						synchronized (retrievalLock) {
+							try {
+								retrievalLock.wait(timeToWait);
+							} catch (InterruptedException e) {
+								throw new Exception("Finding connecting address was interrupted while pausing.");
+							}
+						}
+
+						elapsedTime = System.currentTimeMillis() - startTime;
+					}
+				}
+
+				InetAddress heuristic = null;
+
+				if (targetAddress != null) {
+					LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
+					heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
+				}
+
+				if (heuristic != null) {
+					return heuristic;
+				} else {
+					LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
+					return InetAddress.getLocalHost();
+				}
+			} catch (Exception e) {
+				throw new LeaderRetrievalException("Could not retrieve the connecting address to the " +
+						"current leader with the akka URL " + akkaURL + ".", e);
+			}
+		}
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+			if (leaderAddress != null && !leaderAddress.equals("")) {
+				synchronized (retrievalLock) {
+					akkaURL = leaderAddress;
+					retrievalState = LeaderRetrievalState.NEWLY_RETRIEVED;
+
+					retrievalLock.notifyAll();
+				}
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			synchronized (retrievalLock) {
+				this.exception = exception;
+				retrievalLock.notifyAll();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
deleted file mode 100644
index 2df0616..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.net;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Enumeration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Utilities to determine the network interface and address that should be used to bind the
- * TaskManager communication to.
- */
-public class NetUtils {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
-
-	private static final long MIN_SLEEP_TIME = 50;
-	private static final long MAX_SLEEP_TIME = 20000;
-
-	/**
-	 * The states of address detection mechanism.
-	 * There is only a state transition if the current state failed to determine the address.
-	 */
-	private enum AddressDetectionState {
-		/** Connect from interface returned by InetAddress.getLocalHost() **/
-		LOCAL_HOST(50),
-		/** Detect own IP address based on the target IP address. Look for common prefix */
-		ADDRESS(50),
-		/** Try to connect on all Interfaces and all their addresses with a low timeout */
-		FAST_CONNECT(50),
-		/** Try to connect on all Interfaces and all their addresses with a long timeout */
-		SLOW_CONNECT(1000),
-		/** Choose any non-loopback address */
-		HEURISTIC(0);
-
-		private int timeout;
-
-		AddressDetectionState(int timeout) {
-			this.timeout = timeout;
-		}
-
-		public int getTimeout() {
-			return timeout;
-		}
-	}
-
-
-	/**
-	 * Finds the local network address from which this machine can connect to the target
-	 * address. This method tries to establish a proper network connection to the
-	 * given target, so it only succeeds if the target socket address actually accepts
-	 * connections. The method tries various strategies multiple times and uses an exponential
-	 * backoff timer between tries.
-	 * <p>
-	 * If no connection attempt was successful after the given maximum time, the method
-	 * will choose some address based on heuristics (excluding link-local and loopback addresses.)
-	 * <p>
-	 * This method will initially not log on info level (to not flood the log while the
-	 * backoff time is still very low). It will start logging after a certain time
-	 * has passes.
-	 *
-	 * @param targetAddress The address that the method tries to connect to.
-	 * @param maxWaitMillis The maximum time that this method tries to connect, before falling
-	 *                       back to the heuristics.
-	 * @param startLoggingAfter The time after which the method will log on INFO level.
-	 */
-	public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
-							long maxWaitMillis, long startLoggingAfter) throws IOException
-	{
-		if (targetAddress == null) {
-			throw new NullPointerException("targetAddress must not be null");
-		}
-		if (maxWaitMillis <= 0) {
-			throw new IllegalArgumentException("Max wait time must be positive");
-		}
-
-		final long startTime = System.currentTimeMillis();
-
-		long currentSleepTime = MIN_SLEEP_TIME;
-		long elapsedTime = 0;
-
-		// loop while there is time left
-		while (elapsedTime < maxWaitMillis) {
-			AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
-
-			boolean logging = elapsedTime >= startLoggingAfter;
-			if (logging) {
-				LOG.info("Trying to connect to " + targetAddress);
-			}
-			// go over the strategies ADDRESS - FAST_CONNECT - SLOW_CONNECT
-			do {
-				InetAddress address = findAddressUsingStrategy(strategy, targetAddress, logging);
-				if (address != null) {
-					return address;
-				}
-
-				// pick the next strategy
-				switch (strategy) {
-					case LOCAL_HOST:
-						strategy = AddressDetectionState.ADDRESS;
-						break;
-					case ADDRESS:
-						strategy = AddressDetectionState.FAST_CONNECT;
-						break;
-					case FAST_CONNECT:
-						strategy = AddressDetectionState.SLOW_CONNECT;
-						break;
-					case SLOW_CONNECT:
-						strategy = null;
-						break;
-					default:
-						throw new RuntimeException("Unsupported strategy: " + strategy);
-				}
-			}
-			while (strategy != null);
-
-			// we have made a pass with all strategies over all interfaces
-			// sleep for a while before we make the next pass
-			elapsedTime = System.currentTimeMillis() - startTime;
-
-			long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
-			if (toWait > 0) {
-				if (logging) {
-					LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
-				} else {
-					LOG.debug("Could not connect. Waiting for {} msecs before next attempt", toWait);
-				}
-
-				try {
-					Thread.sleep(toWait);
-				}
-				catch (InterruptedException e) {
-					throw new IOException("Connection attempts have been interrupted.");
-				}
-			}
-
-			// increase the exponential backoff timer
-			currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
-		}
-
-		// our attempts timed out. use the heuristic fallback
-		LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
-		InetAddress heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
-		if (heuristic != null) {
-			return heuristic;
-		}
-		else {
-			LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
-			return InetAddress.getLocalHost();
-		}
-	}
-
-
-	private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
-														InetSocketAddress targetAddress,
-														boolean logging) throws IOException
-	{
-		// try LOCAL_HOST strategy independent of the network interfaces
-		if(strategy == AddressDetectionState.LOCAL_HOST) {
-			InetAddress localhostName = InetAddress.getLocalHost();
-
-			if(tryToConnect(localhostName, targetAddress, strategy.getTimeout(), logging)) {
-				LOG.debug("Using InetAddress.getLocalHost() immediately for the connecting address");
-				return localhostName;
-			} else {
-				return null;
-			}
-		}
-
-		final byte[] targetAddressBytes = targetAddress.getAddress().getAddress();
-
-		// for each network interface
-		Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
-		while (e.hasMoreElements()) {
-
-			NetworkInterface netInterface = e.nextElement();
-
-			// for each address of the network interface
-			Enumeration<InetAddress> ee = netInterface.getInetAddresses();
-			while (ee.hasMoreElements()) {
-				InetAddress interfaceAddress = ee.nextElement();
-
-				switch (strategy) {
-					case ADDRESS:
-						if (hasCommonPrefix(targetAddressBytes, interfaceAddress.getAddress())) {
-							LOG.debug("Target address {} and local address {} share prefix - trying to connect.",
-										targetAddress, interfaceAddress);
-
-							if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
-								return interfaceAddress;
-							}
-						}
-						break;
-
-					case FAST_CONNECT:
-					case SLOW_CONNECT:
-						LOG.debug("Trying to connect to {} from local address {} with timeout {}",
-								targetAddress, interfaceAddress, strategy.getTimeout());
-
-						if (tryToConnect(interfaceAddress, targetAddress, strategy.getTimeout(), logging)) {
-							return interfaceAddress;
-						}
-						break;
-
-					case HEURISTIC:
-						if (LOG.isDebugEnabled()) {
-							LOG.debug("Checking address {} using heuristics: linkLocal: {} loopback: {}",
-									interfaceAddress, interfaceAddress.isLinkLocalAddress(),
-									interfaceAddress.isLoopbackAddress());
-						}
-						// pick a non-loopback non-link-local address
-						if (interfaceAddress instanceof Inet4Address && !interfaceAddress.isLinkLocalAddress() &&
-								!interfaceAddress.isLoopbackAddress())
-						{
-							return interfaceAddress;
-						}
-						break;
-
-					default:
-						throw new RuntimeException("Unsupported strategy: " + strategy);
-				}
-			} // end for each address of the interface
-		} // end for each interface
-
-		return null;
-	}
-
-	/**
-	 * Checks if two addresses have a common prefix (first 2 bytes).
-	 * Example: 192.168.???.???
-	 * Works also with ipv6, but accepts probably too many addresses
-	 */
-	private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
-		return address[0] == address2[0] && address[1] == address2[1];
-	}
-
-	/**
-	 *
-	 * @param fromAddress The address to connect from.
-	 * @param toSocket The socket address to connect to.
-	 * @param timeout The timeout fr the connection.
-	 * @param logFailed Flag to indicate whether to log failed attempts on info level
-	 *                  (failed attempts are always logged on DEBUG level).
-	 * @return True, if the connection was successful, false otherwise.
-	 * @throws IOException Thrown if the socket cleanup fails.
-	 */
-	private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
-										int timeout, boolean logFailed) throws IOException
-	{
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
-					+ " with timeout " + timeout);
-		}
-		Socket socket = new Socket();
-		try {
-			// port 0 = let the OS choose the port
-			SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
-			// machine
-			socket.bind(bindP);
-			socket.connect(toSocket, timeout);
-			return true;
-		}
-		catch (Exception ex) {
-			String message = "Failed to connect from address '" + fromAddress + "': " + ex.getMessage();
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(message, ex);
-			} else if (logFailed) {
-				LOG.info(message);
-			}
-			return false;
-		}
-		finally {
-			socket.close();
-		}
-	}
-
-	/**
-	 * Find a non-occupied port.
-	 *
-	 * @return A non-occupied port.
-	 */
-	public static int getAvailablePort() {
-		for (int i = 0; i < 50; i++) {
-			ServerSocket serverSocket = null;
-			try {
-				serverSocket = new ServerSocket(0);
-				int port = serverSocket.getLocalPort();
-				if (port != 0) {
-					return port;
-				}
-			}
-			catch (IOException e) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Unable to allocate port " + e.getMessage(), e);
-				}
-			}
-			finally {
-				if (serverSocket != null) {
-					try {
-						serverSocket.close();
-					} catch (Throwable t) {
-						// ignored
-					}
-				}
-			}
-		}
-
-		throw new RuntimeException("Could not find a free permitted port on the machine.");
-	}
-
-	public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
-
-		private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
-
-		private enum LeaderRetrievalState {
-			NOT_RETRIEVED,
-			RETRIEVED,
-			NEWLY_RETRIEVED
-		}
-
-		final private Object retrievalLock = new Object();
-
-		private String akkaURL;
-		private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;
-		private Exception exception;
-
-		public InetAddress findConnectingAddress(
-				FiniteDuration timeout) throws LeaderRetrievalException {
-			return findConnectingAddress(timeout, defaultLoggingDelay);
-		}
-
-		public InetAddress findConnectingAddress(
-				FiniteDuration timeout,
-				FiniteDuration startLoggingAfter)
-			throws LeaderRetrievalException {
-			long startTime = System.currentTimeMillis();
-			long currentSleepTime = MIN_SLEEP_TIME;
-			long elapsedTime = 0;
-			InetSocketAddress targetAddress = null;
-
-			try {
-				while (elapsedTime < timeout.toMillis()) {
-
-					long maxTimeout = timeout.toMillis() - elapsedTime;
-
-					synchronized (retrievalLock) {
-						if (exception != null) {
-							throw exception;
-						}
-
-						if (retrievalState == LeaderRetrievalState.NOT_RETRIEVED) {
-							try {
-								retrievalLock.wait(maxTimeout);
-							} catch (InterruptedException e) {
-								throw new Exception("Finding connecting address was interrupted" +
-										"while waiting for the leader retrieval.");
-							}
-						} else if (retrievalState == LeaderRetrievalState.NEWLY_RETRIEVED) {
-							targetAddress = AkkaUtils.getInetSockeAddressFromAkkaURL(akkaURL);
-
-							LOG.info("Retrieved new target address {}.", targetAddress);
-
-							retrievalState = LeaderRetrievalState.RETRIEVED;
-
-							currentSleepTime = MIN_SLEEP_TIME;
-						} else {
-							currentSleepTime = Math.min(2 * currentSleepTime, MAX_SLEEP_TIME);
-						}
-					}
-
-					if (targetAddress != null) {
-						AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
-
-						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
-						if (logging) {
-							LOG.info("Trying to connect to address {}." + targetAddress);
-						}
-
-						do {
-							InetAddress address = NetUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
-							if (address != null) {
-								return address;
-							}
-
-							// pick the next strategy
-							switch (strategy) {
-								case LOCAL_HOST:
-									strategy = AddressDetectionState.ADDRESS;
-									break;
-								case ADDRESS:
-									strategy = AddressDetectionState.FAST_CONNECT;
-									break;
-								case FAST_CONNECT:
-									strategy = AddressDetectionState.SLOW_CONNECT;
-									break;
-								case SLOW_CONNECT:
-									strategy = null;
-									break;
-								default:
-									throw new RuntimeException("Unsupported strategy: " + strategy);
-							}
-						}
-						while (strategy != null);
-					}
-
-					elapsedTime = System.currentTimeMillis() - startTime;
-
-					long timeToWait = Math.min(
-							Math.max(timeout.toMillis() - elapsedTime, 0),
-							currentSleepTime);
-
-					if (timeToWait > 0) {
-						synchronized (retrievalLock) {
-							try {
-								retrievalLock.wait(timeToWait);
-							} catch (InterruptedException e) {
-								throw new Exception("Finding connecting address was interrupted while pausing.");
-							}
-						}
-
-						elapsedTime = System.currentTimeMillis() - startTime;
-					}
-				}
-
-				InetAddress heuristic = null;
-
-				if (targetAddress != null) {
-					LOG.warn("Could not connect to {}. Selecting a local address using heuristics.", targetAddress);
-					heuristic = findAddressUsingStrategy(AddressDetectionState.HEURISTIC, targetAddress, true);
-				}
-
-				if (heuristic != null) {
-					return heuristic;
-				} else {
-					LOG.warn("Could not find any IPv4 address that is not loopback or link-local. Using localhost address.");
-					return InetAddress.getLocalHost();
-				}
-			} catch (Exception e) {
-				throw new LeaderRetrievalException("Could not retrieve the connecting address to the " +
-						"current leader with the akka URL " + akkaURL + ".", e);
-			}
-		}
-
-		@Override
-		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-			if (leaderAddress != null && !leaderAddress.equals("")) {
-				synchronized (retrievalLock) {
-					akkaURL = leaderAddress;
-					retrievalState = LeaderRetrievalState.NEWLY_RETRIEVED;
-
-					retrievalLock.notifyAll();
-				}
-			}
-		}
-
-		@Override
-		public void handleError(Exception exception) {
-			synchronized (retrievalLock) {
-				this.exception = exception;
-				retrievalLock.notifyAll();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 704a482..5cf5bff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.net.ConnectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -149,7 +149,7 @@ public class LeaderRetrievalUtils {
 	public static InetAddress findConnectingAddress(
 			LeaderRetrievalService leaderRetrievalService,
 			FiniteDuration timeout) throws LeaderRetrievalException {
-		NetUtils.LeaderConnectingAddressListener listener = new NetUtils.LeaderConnectingAddressListener();
+		ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener();
 
 		try {
 			leaderRetrievalService.start(listener);

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index aba9c60..7a1bec5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -60,7 +60,7 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.util.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 420199c..bcf8837 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -26,10 +26,12 @@ import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.NetUtils;
+
 import org.junit.Test;
+
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 538901f..bf01422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.Channel;
+
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
+
 import scala.Tuple2;
 
 import java.net.InetAddress;
@@ -48,10 +50,7 @@ public class NettyTestUtil {
 			server.init(protocol);
 		}
 		catch (Exception e) {
-			if (server != null) {
-				server.shutdown();
-			}
-
+			server.shutdown();
 			throw e;
 		}
 
@@ -65,10 +64,7 @@ public class NettyTestUtil {
 			client.init(protocol);
 		}
 		catch (Exception e) {
-			if (client != null) {
-				client.shutdown();
-			}
-
+			client.shutdown();
 			throw e;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 6125479..c52ec2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -30,7 +30,7 @@ import com.google.common.io.Files;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.util.OperatingSystem;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index bba1460..edc1f92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -33,11 +33,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.NetUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
new file mode 100644
index 0000000..e003634
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.net;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Tests for the network utilities.
+ */
+public class ConnectionUtilsTest {
+
+	@Test
+	public void testFindConnectableAddress() {
+		int unusedPort;
+		try {
+			unusedPort = org.apache.flink.util.NetUtils.getAvailablePort();
+		}
+		catch (Throwable t) {
+			// if this system cannot find an available port,
+			// skip this test
+			return;
+		}
+
+		try {
+			// create an unreachable target address
+			InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
+
+			final long start = System.currentTimeMillis();
+			InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);
+
+			// check that it did not take forever
+			assertTrue(System.currentTimeMillis() - start < (OperatingSystem.isWindows() ? 30000 : 8000));
+
+			// we should have found a heuristic address
+			assertNotNull(add);
+
+			// these checks are desirable, but will not work on every machine
+			// such as machines with no connected network media, which may
+			// default to a link local address
+			// assertFalse(add.isLinkLocalAddress());
+			// assertFalse(add.isLoopbackAddress());
+			// assertFalse(add.isAnyLocalAddress());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
deleted file mode 100644
index fe4c82c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/NetUtilsTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.net;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- * Tests for the network utilities.
- */
-public class NetUtilsTest {
-
-	@Test
-	public void testFindConnectableAddress() {
-		int unusedPort;
-		try {
-			unusedPort = NetUtils.getAvailablePort();
-		}
-		catch (Throwable t) {
-			// if this system cannot find an available port,
-			// skip this test
-			return;
-		}
-
-		try {
-			// create an unreachable target address
-			InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);
-
-			final long start = System.currentTimeMillis();
-			InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
-
-			// check that it did not take forever
-			assertTrue(System.currentTimeMillis() - start < (OperatingSystem.isWindows() ? 30000 : 8000));
-
-			// we should have found a heuristic address
-			assertNotNull(add);
-
-			// these checks are desirable, but will not work on every machine
-			// such as machines with no connected network media, which may
-			// default to a link local address
-			// assertFalse(add.isLinkLocalAddress());
-			// assertFalse(add.isLoopbackAddress());
-			// assertFalse(add.isAnyLocalAddress());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 075c1c2..1334bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
-
 import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
+
 import org.junit.Test;
 
 import scala.Some;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 7b0b8f7..461138c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -57,10 +57,10 @@ import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.NetUtils;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index b0fe695..6013309 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.util.NetUtils
 import org.junit.Assert._
 import org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index d762ab4..795ad4e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -28,10 +28,9 @@ import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
-import org.apache.flink.runtime.net.NetUtils
+import org.apache.flink.util.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
-import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import scala.concurrent.{Await, Future}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index cfc104b..61f384a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -33,11 +33,11 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.kafka.common.PartitionInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
index a6e1e7e..4ded0fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
@@ -19,10 +19,11 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.NetUtils;
+
 import org.junit.Assert;
 
 import java.io.BufferedReader;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
index 37f6958..d1bd64a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
@@ -17,8 +17,9 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.NetUtils;
+
 import org.junit.Assert;
 
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index 8abe52e..affe134 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -29,13 +30,16 @@ import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index f1745ee..ac308dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a3c0b446/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index bd1698a..a1a5205 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -28,7 +28,7 @@ import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.hadoop.conf.Configuration;
@@ -143,7 +143,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		LOG.info("Start actor system.");
 		// find name of own public interface, able to connect to the JM
 		// try to find address for 2 seconds. log after 400 ms.
-		InetAddress ownHostname = NetUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
+		InetAddress ownHostname = ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
 		actorSystem = AkkaUtils.createActorSystem(flinkConfig,
 				new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
 


[2/3] flink git commit: [FLINK-2766] [core] Add proper handling of IPv6 address literals in URLs

Posted by se...@apache.org.
[FLINK-2766] [core] Add proper handling of IPv6 address literals in URLs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfde1b73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfde1b73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfde1b73

Branch: refs/heads/master
Commit: bfde1b7379f0a0aef21c836e5ac0842474c7f98f
Parents: a3c0b44
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 29 17:45:06 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:56 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/util/NetUtils.java    |  80 +++++++++
 .../org/apache/flink/util/NetUtilsTest.java     |  97 +++++++++++
 .../apache/flink/runtime/client/JobClient.java  |   7 +-
 .../flink/runtime/net/ConnectionUtils.java      |   2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  42 +++--
 .../flink/runtime/jobmanager/JobManager.scala   |  20 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../TaskManagerProcessReapingTest.java          |   9 +-
 .../flink/runtime/akka/AkkaUtilsTest.scala      |  46 +++++-
 .../fs/RollingSinkFaultTolerance2ITCase.java    |   9 +-
 .../fs/RollingSinkFaultToleranceITCase.java     |   9 +-
 .../connectors/fs/RollingSinkITCase.java        |   5 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |  27 ++-
 .../connectors/kafka/KafkaTestBase.java         |   3 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java | 165 +++++++++++++++++++
 17 files changed, 480 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 03721c5..da445ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -15,13 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.util;
 
+import com.google.common.net.InetAddresses;
 
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.net.UnknownHostException;
 
 public class NetUtils {
 	
@@ -68,6 +75,10 @@ public class NetUtils {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Lookup of to free ports
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Find a non-occupied port.
 	 *
@@ -86,4 +97,73 @@ public class NetUtils {
 
 		throw new RuntimeException("Could not find a free permitted port on the machine.");
 	}
+	
+
+	// ------------------------------------------------------------------------
+	//  Encoding of IP addresses for URLs
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
+	 * have the proper formatting to be included in URLs.
+	 * <p>
+	 * This method internally uses Guava's functionality to properly encode IPv6 addresses.
+	 * 
+	 * @param address The IP address to encode.
+	 * @return The proper URL string encoded IP address.
+	 */
+	public static String ipAddressToUrlString(InetAddress address) {
+		if (address == null) {
+			throw new NullPointerException("address is null");
+		}
+		else if (address instanceof Inet4Address) {
+			return address.getHostAddress();
+		}
+		else if (address instanceof Inet6Address) {
+			return '[' + InetAddresses.toAddrString(address) + ']';
+		}
+		else {
+			throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);
+		}
+	}
+
+	/**
+	 * Encodes an IP address and port to be included in URL. in particular, this method makes
+	 * sure that IPv6 addresses have the proper formatting to be included in URLs.
+	 * 
+	 * @param address The address to be included in the URL.
+	 * @param port The port for the URL address.
+	 * @return The proper URL string encoded IP address and port.
+	 */
+	public static String ipAddressAndPortToUrlString(InetAddress address, int port) {
+		return ipAddressToUrlString(address) + ':' + port;
+	}
+
+	/**
+	 * Encodes an IP address and port to be included in URL. in particular, this method makes
+	 * sure that IPv6 addresses have the proper formatting to be included in URLs.
+	 * 
+	 * @param address The socket address with the IP address and port.
+	 * @return The proper URL string encoded IP address and port.
+	 */
+	public static String socketAddressToUrlString(InetSocketAddress address) {
+		if (address.isUnresolved()) {
+			throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());
+		}
+		return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());
+	}
+
+	/**
+	 * Normalizes and encodes a hostname and port to be included in URL. 
+	 * In particular, this method makes sure that IPv6 address literals have the proper
+	 * formatting to be included in URLs.
+	 *
+	 * @param host The address to be included in the URL.
+	 * @param port The port for the URL address.
+	 * @return The proper URL string encoded IP address and port.
+	 * @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into a URL.
+	 */
+	public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {
+		return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
new file mode 100644
index 0000000..cd2c13b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.*;
+
+public class NetUtilsTest {
+
+	@Test
+	public void testIPv4toURL() {
+		try {
+			final String addressString = "192.168.0.1";
+
+			InetAddress address = InetAddress.getByName(addressString);
+			assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testIPv6toURL() {
+		try {
+			final String addressString = "2001:01db8:00:0:00:ff00:42:8329";
+			final String normalizedAddress = "[2001:1db8::ff00:42:8329]";
+
+			InetAddress address = InetAddress.getByName(addressString);
+			assertEquals(normalizedAddress, NetUtils.ipAddressToUrlString(address));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIPv4URLEncoding() {
+		try {
+			final String addressString = "10.244.243.12";
+			final int port = 23453;
+			
+			InetAddress address = InetAddress.getByName(addressString);
+			InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+			
+			assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+			assertEquals(addressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address, port));
+			assertEquals(addressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testIPv6URLEncoding() {
+		try {
+			final String addressString = "2001:db8:10:11:12:ff00:42:8329";
+			final String bracketedAddressString = '[' + addressString + ']';
+			final int port = 23453;
+
+			InetAddress address = InetAddress.getByName(addressString);
+			InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+
+			assertEquals(bracketedAddressString, NetUtils.ipAddressToUrlString(address));
+			assertEquals(bracketedAddressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address, port));
+			assertEquals(bracketedAddressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index c51bc7c..a436881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
 
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,9 +75,11 @@ public class JobClient {
 		ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
 		Address address = system.provider().getDefaultAddress();
 
-		String host = address.host().isDefined() ? address.host().get() : "(unknown)";
+		String hostAddress = address.host().isDefined() ?
+				NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
+				"(unknown)";
 		int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
-		LOG.info("Started JobClient actor system at " + host + ':' + port);
+		LOG.info("Started JobClient actor system at " + hostAddress + ':' + port);
 
 		return system;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 0ed9345..542e69e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -365,7 +365,7 @@ public class ConnectionUtils {
 
 						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
 						if (logging) {
-							LOG.info("Trying to connect to address {}." + targetAddress);
+							LOG.info("Trying to connect to address {}", targetAddress);
 						}
 
 						do {

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 8007ef6..bf679c9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.akka
 
 import java.io.IOException
-import java.net.{InetSocketAddress, InetAddress}
+import java.net._
 import java.util.concurrent.{TimeUnit, Callable}
 
 import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.util.NetUtils
 import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
 import org.slf4j.LoggerFactory
 import scala.concurrent._
@@ -102,6 +103,7 @@ object AkkaUtils {
    *                         then an Akka config for local actor system will be returned
    * @return Akka config
    */
+  @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     listeningAddress: Option[(String, Int)]): Config = {
     val defaultConfig = getBasicAkkaConfig(configuration)
@@ -109,8 +111,9 @@ object AkkaUtils {
     listeningAddress match {
 
       case Some((hostname, port)) =>
-        val ipAddress = "\"" + InetAddress.getByName(hostname).getHostAddress() + "\""
-        val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port)
+        val ipAddress = InetAddress.getByName(hostname)
+        val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\""
+        val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port)
         remoteConfig.withFallback(defaultConfig)
 
       case None =>
@@ -513,23 +516,30 @@ object AkkaUtils {
     * the Akka URL does not contain the hostname and port information, e.g. a local Akka URL is
     * provided, then an [[Exception]] is thrown.
     *
-    * @param akkaURL
-    * @throws java.lang.Exception
-    * @return
+    * @param akkaURL The URL to extract the host and port from.
+    * @throws java.lang.Exception Thrown, if the given string does not represent a proper url
+    * @return The InetSocketAddress with teh extracted host and port.
     */
   @throws(classOf[Exception])
   def getInetSockeAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
     // AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL
-    val hostPortRegex = """@([^/:]*):(\d*)""".r
-
-    hostPortRegex.findFirstMatchIn(akkaURL) match {
-      case Some(m) =>
-        val host = m.group(1)
-        val port = m.group(2).toInt
-
-        new InetSocketAddress(host, port)
-      case None => throw new Exception("Could not retrieve InetSocketAddress from " +
-        s"Akka URL $akkaURL")
+    try {
+      // we need to manually strip the protocol, because "akka.tcp" is not
+      // a valid protocol for Java's URL class
+      val protocolonPos = akkaURL.indexOf("://")
+      if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) {
+        throw new MalformedURLException()
+      }
+      
+      val url = new URL("http://" + akkaURL.substring(protocolonPos + 3))
+      if (url.getHost == null || url.getPort == -1) {
+        throw new MalformedURLException()
+      }
+      new InetSocketAddress(url.getHost, url.getPort)
+    }
+    catch {
+      case _ : MalformedURLException =>
+        throw new Exception(s"Could not retrieve InetSocketAddress from Akka URL $akkaURL")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 07a5977..b7f76ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,12 +44,8 @@ import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-
-import org.apache.flink.runtime.messages.accumulators.{AccumulatorResultsErroneous,
-AccumulatorResultsFound, RequestAccumulatorResults, AccumulatorMessage,
-AccumulatorResultStringsFound, RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
-AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
@@ -67,7 +63,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
+import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil}
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -1237,7 +1233,8 @@ object JobManager {
     LOG.info("Starting JobManager")
 
     // Bring up the job manager actor system first, bind it to the given address.
-    LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
+    val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
+    LOG.info(s"Starting JobManager actor system at $hostPortUrl")
 
     val jobManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1451,8 +1448,9 @@ object JobManager {
 
     val executionMode = config.getJobManagerMode
     val streamingMode = config.getStreamingMode
-
-    LOG.info(s"Starting JobManager on $host:$port with execution mode $executionMode and " +
+    val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port)
+    
+    LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode and " +
       s"streaming mode $streamingMode")
 
     (configuration, executionMode, streamingMode, host, port)
@@ -1657,7 +1655,7 @@ object JobManager {
       address: InetSocketAddress,
       name: Option[String] = None)
     : String = {
-    val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
+    val hostPort = NetUtils.socketAddressToUrlString(address)
 
     getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 839193b..b3cff51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -76,7 +76,9 @@ abstract class FlinkMiniCluster(
 
   // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
   // not getLocalHost(), which may be 127.0.1.1
-  val hostname = InetAddress.getByName("localhost").getHostAddress()
+  val hostname = userConfiguration.getString(
+    ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
+    InetAddress.getByName("localhost").getHostAddress())
 
   val configuration = generateConfiguration(userConfiguration)
 
@@ -243,7 +245,7 @@ abstract class FlinkMiniCluster(
     jobManagerActorSystems = Some(jmActorSystems)
     jobManagerActors = Some(jmActors)
 
-    val lrs = createLeaderRetrievalService();
+    val lrs = createLeaderRetrievalService()
 
     leaderRetrievalService = Some(lrs)
     lrs.start(this)

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7a1bec5..bf23021 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1425,8 +1425,9 @@ object TaskManager {
     LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
 
     // Bring up the TaskManager actor system first, bind it to the given address.
-
-    LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
+    
+    LOG.info("Starting TaskManager actor system at " + 
+      NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort))
 
     val taskManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1443,7 +1444,7 @@ object TaskManager {
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
-            val address = taskManagerHostname + ":" + actorSystemPort
+            val address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)
             throw new IOException("Unable to bind TaskManager actor system to address " +
               address + " - " + cause.getMessage(), t)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 1334bcc..ed03ae7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -40,6 +40,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -78,10 +79,11 @@ public class TaskManagerProcessReapingTest {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
+			final InetAddress localhost = InetAddress.getByName("localhost");
 			final int jobManagerPort = NetUtils.getAvailablePort();
 
 			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
 			jmActorSystem = AkkaUtils.createActorSystem(
 					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
 
@@ -109,8 +111,9 @@ public class TaskManagerProcessReapingTest {
 
 			// grab the reference to the TaskManager. try multiple times, until the process
 			// is started and the TaskManager is up
-			String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s",
-					"127.0.0.1", taskManagerPort, TaskManager.TASK_MANAGER_NAME());
+			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
+					org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+					TaskManager.TASK_MANAGER_NAME());
 
 			ActorRef taskManagerRef = null;
 			Throwable lastError = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 59477db..4e08857 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.junit.runner.RunWith
@@ -64,4 +64,48 @@ class AkkaUtilsTest
     result should equal(expected)
   }
 
+  test("getHostFromAkkaURL should handle 'akka.tcp' as protocol") {
+    val url = "akka.tcp://flink@localhost:1234/user/jobmanager"
+    val expected = new InetSocketAddress("localhost", 1234)
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(expected)
+  }
+
+  test("getHostFromAkkaURL should properly handle IPv4 addresses in URLs") {
+    val IPv4AddressString = "192.168.0.1"
+    val port = 1234
+    val address = new InetSocketAddress(IPv4AddressString, port)
+    
+    val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
+
+  test("getHostFromAkkaURL should properly handle IPv6 addresses in URLs") {
+    val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+    val port = 1234
+    val address = new InetSocketAddress(IPv6AddressString, port)
+
+    val url = s"akka://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
+
+  test("getHostFromAkkaURL should properly handle IPv6 addresses in 'akka.tcp' URLs") {
+    val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+    val port = 1234
+    val address = new InetSocketAddress(IPv6AddressString, port)
+
+    val url = s"akka.tcp://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
index 9c70ed2..7d127ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -69,7 +70,6 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
 
 	private static MiniDFSCluster hdfsCluster;
 	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
 
 	private static String outPath;
 
@@ -87,10 +87,9 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
 
 		dfs = hdfsCluster.getFileSystem();
 
-		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
-		outPath = hdfsURI + "/string-non-rolling-out-no-checkpoint";
-
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(),  hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out-no-checkpoint";
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index e0592e9..65904d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -64,7 +65,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 
 	private static MiniDFSCluster hdfsCluster;
 	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
 
 	private static String outPath;
 
@@ -82,10 +82,9 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 
 		dfs = hdfsCluster.getFileSystem();
 
-		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
-		outPath = hdfsURI + "/string-non-rolling-out";
-
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out";
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 008b4b6..9770f41 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -79,7 +80,9 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 
 		dfs = hdfsCluster.getFileSystem();
 
-		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 5016e7e..e9a5728 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -69,6 +69,7 @@ import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
 
+import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Assert;
 
@@ -841,7 +842,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (firstPart.errorCode() != 0);
 		zkClient.close();
 
-		final String leaderToShutDown = firstPart.leader().get().connectionString();
+		final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
+		final String leaderToShutDownConnection = 
+				NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
+		
+		
 		final int leaderIdToShutDown = firstPart.leader().get().id();
 		LOG.info("Leader to shutdown {}", leaderToShutDown);
 
@@ -863,7 +868,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env
 				.addSource(kafkaSource)
 				.map(new PartitionValidatingMapper(parallelism, 1))
-				.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+				.map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
 				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
 
 		BrokerKillingMapper.killedLeaderBefore = false;
@@ -1068,14 +1073,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					// shut down a Kafka broker
 					KafkaServer toShutDown = null;
 					for (KafkaServer kafkaServer : brokers) {
-						if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort())) {
+						String connectionUrl = 
+								NetUtils.hostAndPortToUrlString(
+										kafkaServer.config().advertisedHostName(),
+										kafkaServer.config().advertisedPort());
+						if (leaderToShutDown.equals(connectionUrl)) {
 							toShutDown = kafkaServer;
 							break;
 						}
 					}
 	
 					if (toShutDown == null) {
-						throw new Exception("Cannot find broker to shut down");
+						StringBuilder listOfBrokers = new StringBuilder();
+						for (KafkaServer kafkaServer : brokers) {
+							listOfBrokers.append(
+									NetUtils.hostAndPortToUrlString(
+											kafkaServer.config().advertisedHostName(),
+											kafkaServer.config().advertisedPort()));
+							listOfBrokers.append(" ; ");
+						}
+						
+						throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
+								+ " ; available brokers: " + listOfBrokers.toString());
 					}
 					else {
 						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 61f384a..d511796 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -59,6 +59,7 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -148,7 +149,7 @@ public abstract class KafkaTestBase extends TestLogger {
 				SocketServer socketServer = brokers.get(i).socketServer();
 				
 				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionStrings += host+":"+socketServer.port()+",";
+				brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c8b0e0c..5c5a465 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -96,7 +96,7 @@ class ForkableFlinkMiniCluster(
       ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
       ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-    if(jobManagerPort > 0) {
+    if (jobManagerPort > 0) {
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
new file mode 100644
index 0000000..af51ed6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.Some;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.util.Enumeration;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class IPv6HostnamesITCase extends TestLogger {
+	
+	@Test
+	public void testClusterWithIPv6host() {
+
+		final Inet6Address ipv6address = getLocalIPv6Address();
+		if (ipv6address == null) {
+			System.err.println("--- Cannot find a non-loopback local IPv6 address, skipping IPv6HostnamesITCase");
+			return;
+		}
+
+		
+		
+		ForkableFlinkMiniCluster flink = null;
+		try {
+			final String addressString = ipv6address.getHostAddress();
+			log.info("Test will use IPv6 address " + addressString + " for connection tests");
+			
+			Configuration conf = new Configuration();
+			conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
+			conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
+			conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+			
+			flink = new ForkableFlinkMiniCluster(conf, false, StreamingMode.BATCH_ONLY);
+			flink.start();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString, flink.getLeaderRPCPort());
+
+			// get input data
+			DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+			DataSet<Tuple2<String, Integer>> counts =text
+					.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+						@Override
+						public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
+							for (String token : value.toLowerCase().split("\\W+")) {
+								if (token.length() > 0) {
+									out.collect(new Tuple2<String, Integer>(token, 1));
+								}
+							}
+						}
+					})
+					.groupBy(0).sum(1);
+
+			List<Tuple2<String, Integer>> result = counts.collect();
+
+			TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (flink != null) {
+				flink.shutdown();
+			}
+		}
+	}
+	
+	
+	private Inet6Address getLocalIPv6Address() {
+		try {
+			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+			while (e.hasMoreElements()) {
+				NetworkInterface netInterface = e.nextElement();
+
+				// for each address of the network interface
+				Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+				while (ee.hasMoreElements()) {
+					InetAddress addr = ee.nextElement();
+					
+					
+					if (addr instanceof Inet6Address && (!addr.isLoopbackAddress()) && (!addr.isAnyLocalAddress())) {
+						// see if it is possible to bind to the address
+						InetSocketAddress socketAddress = new InetSocketAddress(addr, 0);
+						
+						try {
+							log.info("Considering address " + addr);
+							
+							// test whether we can bind a socket to that address
+							log.info("Testing whether sockets can bind to " + addr);
+							ServerSocket sock = new ServerSocket();
+							sock.bind(socketAddress);
+							sock.close();
+
+							// test whether Akka's netty can bind to the address
+							log.info("Testing whether Akka can use " + addr);
+							int port = NetUtils.getAvailablePort();
+							ActorSystem as = AkkaUtils.createActorSystem(
+									new Configuration(),
+									new Some<scala.Tuple2<String, Object>>(new scala.Tuple2<String, Object>(addr.getHostAddress(), port)));
+							as.shutdown();
+
+							log.info("Using address " + addr);
+							return (Inet6Address) addr;
+						}
+						catch (IOException ignored) {
+							// fall through the loop
+						}
+					}
+				}
+			}
+			
+			return null;
+		}
+		catch (Exception e) {
+			return null;
+		}
+	}
+}


[3/3] flink git commit: [hotfix] [build] Add joda-convert as a non-optional dependency, to fix Scala interoperability

Posted by se...@apache.org.
[hotfix] [build] Add joda-convert as a non-optional dependency, to fix Scala interoperability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c04a7704
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c04a7704
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c04a7704

Branch: refs/heads/master
Commit: c04a770421c6e13273307d6aaf9de462c4398360
Parents: bfde1b7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 30 14:51:07 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 08:59:56 2015 +0200

----------------------------------------------------------------------
 flink-java/pom.xml | 5 +++++
 pom.xml            | 6 ++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c04a7704/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 49e6099..c8f88f4 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -87,6 +87,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.joda</groupId>
+			<artifactId>joda-convert</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>com.google.guava</groupId>
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/c04a7704/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 573e49c..6cdf7fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -251,6 +251,12 @@ under the License.
 				<version>2.5</version>
 			</dependency>
 
+			<dependency>
+				<groupId>org.joda</groupId>
+				<artifactId>joda-convert</artifactId>
+				<version>1.7</version>
+			</dependency>
+			
 			<!-- stax is pulled in different versions by different transitive dependencies-->
 			<dependency>
 				<groupId>stax</groupId>