You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/16 17:00:39 UTC

[6/7] flink git commit: [FLINK-2821] use custom Akka build to listen on all interfaces

[FLINK-2821] use custom Akka build to listen on all interfaces

This uses Flakka (a custom Akka 2.3 build) to resolve the issue that
the bind address needs to be matching the external address of the
JobManager. With the changes applied, we can now bind to all
interfaces, e.g. via 0.0.0.0 (IPv4) or :: (IPv6).

For this to work properly, the configuration entry
JOB_MANAGER_IPC_ADDRESS now represents the external address of the
JobManager. Consequently, it should not be resolved to an IP address
anymore because it may not be resolvable from within containered
environments. Akka treats this address as the logical address. Any
messages which are not tagged with this address will be received by
the Actor System (because we listen on all interfaces) but will be
dropped subsequently. In addition, we need the external address for
the JobManager to be able to publish it to Zookeeper for HA setups.

Flakka: https://github.com/mxm/flakka
Patch applied: https://github.com/akka/akka/pull/15610

- convert host to lower case
- use consistent format for IPv6 address
- adapt config and test cases
- adapt documentation to clarify the address config entry
- TaskManager: resolve the initial hostname of the StandaloneLeaderRetrievalService

This closes #2917.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27ebdf7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27ebdf7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27ebdf7a

Branch: refs/heads/master
Commit: 27ebdf7acde0e2c3ad183503d0588ca91e63d729
Parents: e9e6688
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Nov 16 15:50:01 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  4 +-
 flink-clients/pom.xml                           |  4 +-
 .../flink/client/program/ClusterClient.java     |  2 +-
 .../RemoteExecutorHostnameResolutionTest.java   | 23 ++---
 ...rRetrievalServiceHostnameResolutionTest.java | 37 ++++----
 .../org/apache/flink/storm/api/FlinkClient.java |  4 +-
 .../java/org/apache/flink/util/NetUtils.java    | 79 +++++++++++++++-
 .../org/apache/flink/util/NetUtilsTest.java     | 59 ++++++++++++
 flink-dist/src/main/resources/flink-conf.yaml   | 14 ++-
 flink-mesos/pom.xml                             | 16 ++--
 .../main/resources/archetype-resources/pom.xml  |  6 +-
 .../main/resources/archetype-resources/pom.xml  |  6 +-
 flink-runtime-web/pom.xml                       |  4 +-
 flink-runtime/pom.xml                           | 16 ++--
 .../jobmanager/JobManagerCliOptions.java        |  2 +-
 .../runtime/util/LeaderRetrievalUtils.java      | 16 +++-
 .../flink/runtime/util/StandaloneUtils.java     | 42 ++++++---
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 72 ++++++++------
 .../flink/runtime/jobmanager/JobManager.scala   | 98 ++++++++------------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  5 +-
 .../JobManagerProcessReapingTest.java           |  4 +-
 .../jobmanager/JobManagerStartupTest.java       |  4 +-
 .../ZooKeeperLeaderRetrievalTest.java           |  8 +-
 .../TaskManagerProcessReapingTestBase.java      |  6 +-
 .../taskmanager/TaskManagerStartupTest.java     |  7 +-
 .../runtime/testutils/JobManagerProcess.java    |  4 +-
 .../flink/runtime/akka/AkkaUtilsTest.scala      |  5 +-
 .../jobmanager/JobManagerConnectionTest.scala   |  4 +-
 flink-tests/pom.xml                             |  4 +-
 flink-yarn-tests/pom.xml                        |  4 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  2 +-
 flink-yarn/pom.xml                              | 16 ++--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  2 +-
 pom.xml                                         | 30 +++---
 35 files changed, 390 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 89e8207..680f4f7 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -48,7 +48,7 @@ The configuration files for the TaskManagers can be different, Flink does not as
 
 - `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
 
-- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost).
+- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). **Note:** The address (host name or IP) should be accessible by all nodes including the client.
 
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
 
@@ -206,7 +206,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 
 The following parameters configure Flink's JobManager and TaskManagers.
 
-- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**).
+- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**). **Note:** The address (host name or IP) should be accessible by all nodes including the client.
 
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: **6123**).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 5639ed6..14b5461 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -78,8 +78,8 @@ under the License.
 		</dependency>
 		
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 34a9197..8d0e841 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -413,7 +413,7 @@ public abstract class ClusterClient {
 
 		final LeaderRetrievalService leaderRetrievalService;
 		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
 		} catch (Exception e) {
 			throw new ProgramInvocationException("Could not create the leader retrieval service", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index fb5200b..07edb3a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -39,15 +40,17 @@ public class RemoteExecutorHostnameResolutionTest {
 
 	private static final String nonExistingHostname = "foo.bar.com.invalid";
 	private static final int port = 14451;
-	
-	
+
+	@BeforeClass
+	public static void check() {
+		checkPreconditions();
+	}
+
 	@Test
 	public void testUnresolvableHostname1() {
-		
-		checkPreconditions();
-		
+
+		RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 		try {
-			RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
@@ -65,12 +68,10 @@ public class RemoteExecutorHostnameResolutionTest {
 	@Test
 	public void testUnresolvableHostname2() {
 
-		checkPreconditions();
-		
-		try {
-			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
-			RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
+		InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
+		RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
 				Collections.<URL>emptyList(), Collections.<URL>emptyList());
+		try {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index ee26145..dd7d8bc 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -22,27 +22,34 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.TestLogger;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import static org.junit.Assert.*;
-import static org.junit.Assume.*;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify that the LeaderRetrievalSevice correctly handles non-resolvable host names
  * and does not fail with another exception
  */
 public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
-	
+
 	private static final String nonExistingHostname = "foo.bar.com.invalid";
-	
+
+	@BeforeClass
+	public static void check() {
+		checkPreconditions();
+	}
+
+	/*
+	 * Tests that the StandaloneLeaderRetrievalService resolves host names if specified.
+	 */
 	@Test
 	public void testUnresolvableHostname1() {
-		
-		checkPreconditions();
-		
+
 		try {
 			Configuration config = new Configuration();
 
@@ -50,30 +57,28 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
 			LeaderRetrievalUtils.createLeaderRetrievalService(config);
-			fail("This should fail with an UnknownHostException");
-		}
-		catch (UnknownHostException e) {
-			// that is what we want!
 		}
 		catch (Exception e) {
-			System.err.println("Wrong exception!");
+			System.err.println("Shouldn't throw an exception!");
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
 
+	/*
+	 * Tests that the StandaloneLeaderRetrievalService does not resolve host names by default.
+	 */
 	@Test
 	public void testUnresolvableHostname2() {
 
-		checkPreconditions();
-		
 		try {
 			Configuration config = new Configuration();
+
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
-			LeaderRetrievalUtils.createLeaderRetrievalService(config);
-			fail("This should fail with an UnknownHostException");
+			LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
+			fail("This should fail with an IllegalConfigurationException");
 		}
 		catch (UnknownHostException e) {
 			// that is what we want!
@@ -84,7 +89,7 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private static void checkPreconditions() {
 		// the test can only work if the invalid URL cannot be resolves
 		// some internet providers resolve unresolvable URLs to navigational aid servers,

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 9f47d60..6019aa3 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +64,6 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -333,7 +333,7 @@ public class FlinkClient {
 		}
 
 		return JobManager.getJobManagerActorRef(AkkaUtils.getAkkaProtocol(configuration),
-				new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+				NetUtils.unresolvedHostAndPortToNormalizedString(this.jobManagerHost, this.jobManagerPort),
 				actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 6f63eb4..d4437e4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -20,8 +20,10 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.net.util.IPAddressUtil;
 
 import java.io.IOException;
 import java.net.Inet4Address;
@@ -40,6 +42,9 @@ import java.util.Iterator;
 public class NetUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
+
+	/** The wildcard address to listen on all interfaces (either 0.0.0.0 or ::) */
+	private static final String WILDCARD_ADDRESS = new InetSocketAddress(0).getAddress().getHostAddress();
 	
 	/**
 	 * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
@@ -111,7 +116,55 @@ public class NetUtils {
 	// ------------------------------------------------------------------------
 	//  Encoding of IP addresses for URLs
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * Returns an address in a normalized format for Akka.
+	 * When an IPv6 address is specified, it normalizes the IPv6 address to avoid
+	 * complications with the exact URL match policy of Akka.
+	 * @param host The hostname, IPv4 or IPv6 address
+	 * @return host which will be normalized if it is an IPv6 address
+	 */
+	public static String unresolvedHostToNormalizedString(String host) {
+		// Return loopback interface address if host is null
+		// This represents the behavior of {@code InetAddress.getByName } and RFC 3330
+		if (host == null) {
+			host = InetAddress.getLoopbackAddress().getHostAddress();
+		} else {
+			host = host.trim().toLowerCase();
+		}
+
+		// normalize and valid address
+		if (IPAddressUtil.isIPv6LiteralAddress(host)) {
+			byte[] ipV6Address = IPAddressUtil.textToNumericFormatV6(host);
+			host = getIPv6UrlRepresentation(ipV6Address);
+		} else if (!IPAddressUtil.isIPv4LiteralAddress(host)) {
+			try {
+				// We don't allow these in hostnames
+				Preconditions.checkArgument(!host.startsWith("."));
+				Preconditions.checkArgument(!host.endsWith("."));
+				Preconditions.checkArgument(!host.contains(":"));
+			} catch (Exception e) {
+				throw new IllegalConfigurationException("The configured hostname is not valid", e);
+			}
+		}
+
+		return host;
+	}
+
+	/**
+	 * Returns a valid address for Akka. It returns a String of format 'host:port'.
+	 * When an IPv6 address is specified, it normalizes the IPv6 address to avoid
+	 * complications with the exact URL match policy of Akka.
+	 * @param host The hostname, IPv4 or IPv6 address
+	 * @param port The port
+	 * @return host:port where host will be normalized if it is an IPv6 address
+	 */
+	public static String unresolvedHostAndPortToNormalizedString(String host, int port) {
+		Preconditions.checkArgument(port >= 0 && port < 65536,
+			"Port is not within the valid range,");
+		return unresolvedHostToNormalizedString(host) + ":" + port;
+	}
+
 	/**
 	 * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
 	 * have the proper formatting to be included in URLs.
@@ -137,7 +190,7 @@ public class NetUtils {
 	/**
 	 * Encodes an IP address and port to be included in URL. in particular, this method makes
 	 * sure that IPv6 addresses have the proper formatting to be included in URLs.
-	 * 
+	 *
 	 * @param address The address to be included in the URL.
 	 * @param port The port for the URL address.
 	 * @return The proper URL string encoded IP address and port.
@@ -176,14 +229,24 @@ public class NetUtils {
 
 	/**
 	 * Creates a compressed URL style representation of an Inet6Address.
-	 * 
+	 *
 	 * <p>This method copies and adopts code from Google's Guava library.
 	 * We re-implement this here in order to reduce dependency on Guava.
 	 * The Guava library has frequently caused dependency conflicts in the past.
 	 */
 	private static String getIPv6UrlRepresentation(Inet6Address address) {
+		return getIPv6UrlRepresentation(address.getAddress());
+	}
+
+	/**
+	 * Creates a compressed URL style representation of an Inet6Address.
+	 *
+	 * <p>This method copies and adopts code from Google's Guava library.
+	 * We re-implement this here in order to reduce dependency on Guava.
+	 * The Guava library has frequently caused dependency conflicts in the past.
+	 */
+	private static String getIPv6UrlRepresentation(byte[] addressBytes) {
 		// first, convert bytes to 16 bit chunks
-		byte[] addressBytes = address.getAddress();
 		int[] hextets = new int[8];
 		for (int i = 0; i < hextets.length; i++) {
 			hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);
@@ -309,6 +372,14 @@ public class NetUtils {
 		return null;
 	}
 
+	/**
+	 * Returns the wildcard address to listen on all interfaces.
+	 * @return Either 0.0.0.0 or :: depending on the IP setup.
+	 */
+	public static String getWildcardIPAddress() {
+		return WILDCARD_ADDRESS;
+	}
+
 	public interface SocketFactory {
 		ServerSocket createSocket(int port) throws IOException;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index 72cc89e..03b21dd 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -162,4 +163,62 @@ public class NetUtilsTest {
 		error = null;
 
 	}
+
+	@Test
+	public void testFormatAddress() throws UnknownHostException {
+		{
+			// IPv4
+			String host = "1.2.3.4";
+			int port = 42;
+			Assert.assertEquals(host + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// IPv6
+			String host = "2001:0db8:85a3:0000:0000:8a2e:0370:7334";
+			int port = 42;
+			Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// Hostnames
+			String host = "somerandomhostname";
+			int port = 99;
+			Assert.assertEquals(host + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// Whitespace
+			String host = "  somerandomhostname  ";
+			int port = 99;
+			Assert.assertEquals(host.trim() + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// Illegal hostnames
+			String host = "illegalhost.";
+			int port = 42;
+			try {
+				NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
+				fail();
+			} catch (Exception ignored) {}
+			// Illegal hostnames
+			host = "illegalhost:fasf";
+			try {
+				NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
+				fail();
+			} catch (Exception ignored) {}
+		}
+		{
+			// Illegal port ranges
+			String host = "1.2.3.4";
+			int port = -1;
+			try {
+				NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
+				fail();
+			} catch (Exception ignored) {}
+		}
+		{
+			// lower case conversion of hostnames
+			String host = "CamelCaseHostName";
+			int port = 99;
+			Assert.assertEquals(host.toLowerCase() + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 751acda..c650cfe 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -21,14 +21,18 @@
 # Common
 #==============================================================================
 
-# The host on which the JobManager runs. Only used in non-high-availability mode.
-# The JobManager process will use this hostname to bind the listening servers to.
-# The TaskManagers will try to connect to the JobManager on that host.
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
 
 jobmanager.rpc.address: localhost
 
-
-# The port where the JobManager's main actor system listens for messages.
+# The RPC port where the JobManager is reachable.
 
 jobmanager.rpc.port: 6123
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 55e0472..69e0c84 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -61,13 +61,13 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 			<exclusions>
 				<!-- exclude protobuf here to allow the mesos library to provide it -->
 				<exclusion>
@@ -78,8 +78,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-slf4j_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
@@ -123,8 +123,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index cf66bcf..57f3e25 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -194,9 +194,9 @@ under the License.
 									<exclude>org.scala-lang:scala-library</exclude>
 									<exclude>org.scala-lang:scala-compiler</exclude>
 									<exclude>org.scala-lang:scala-reflect</exclude>
-									<exclude>com.typesafe.akka:akka-actor_*</exclude>
-									<exclude>com.typesafe.akka:akka-remote_*</exclude>
-									<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
+									<exclude>com.data-artisans:flakka-actor_*</exclude>
+									<exclude>com.data-artisans:flakka-remote_*</exclude>
+									<exclude>com.data-artisans:flakka-slf4j_*</exclude>
 									<exclude>io.netty:netty-all</exclude>
 									<exclude>io.netty:netty</exclude>
 									<exclude>commons-fileupload:commons-fileupload</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index afb9b50..24225f6 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -199,9 +199,9 @@ under the License.
 									<exclude>org.scala-lang:scala-library</exclude>
 									<exclude>org.scala-lang:scala-compiler</exclude>
 									<exclude>org.scala-lang:scala-reflect</exclude>
-									<exclude>com.typesafe.akka:akka-actor_*</exclude>
-									<exclude>com.typesafe.akka:akka-remote_*</exclude>
-									<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
+									<exclude>com.data-artisans:flakka-actor_*</exclude>
+									<exclude>com.data-artisans:flakka-remote_*</exclude>
+									<exclude>com.data-artisans:flakka-slf4j_*</exclude>
 									<exclude>io.netty:netty-all</exclude>
 									<exclude>io.netty:netty</exclude>
 									<exclude>commons-fileupload:commons-fileupload</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index badb9ba..a6397a4 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -92,8 +92,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index eec75c9..e522d77 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -92,18 +92,18 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-slf4j_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
@@ -193,8 +193,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 		</dependency>
 
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index 3598a29..c460345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The command line parameters passed to the TaskManager.
+ * The command line parameters passed to the JobManager.
  */
 public class JobManagerCliOptions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index b6d9306..b18cdd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -60,12 +60,26 @@ public class LeaderRetrievalUtils {
 	 */
 	public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
 		throws Exception {
+		return createLeaderRetrievalService(configuration, false);
+	}
+
+	/**
+	 * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
+	 *
+	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
+	 * @param resolveInitialHostName If true, resolves the initial hostname
+	 * @return The {@link LeaderRetrievalService} specified in the configuration object
+	 * @throws Exception
+	 */
+	public static LeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration, boolean resolveInitialHostName)
+		throws Exception {
 
 		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
 		switch (highAvailabilityMode) {
 			case NONE:
-				return StandaloneUtils.createLeaderRetrievalService(configuration);
+				return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
index 8998add..8436ced 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
@@ -22,11 +22,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.util.NetUtils;
 import scala.Option;
 import scala.Tuple3;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 /**
@@ -43,9 +43,24 @@ public final class StandaloneUtils {
 	 * @throws UnknownHostException
 	 */
 	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration)
+		Configuration configuration)
 		throws UnknownHostException {
-		return createLeaderRetrievalService(configuration, null);
+		return createLeaderRetrievalService(configuration, false);
+	}
+
+	/**
+	 * Creates a {@link StandaloneLeaderRetrievalService} from the given configuration. The
+	 * host and port for the remote Akka URL are retrieved from the provided configuration.
+	 *
+	 * @param configuration Configuration instance containing the host and port information
+	 * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
+	 * @return StandaloneLeaderRetrievalService
+	 * @throws UnknownHostException
+	 */
+	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration, boolean resolveInitialHostName)
+		throws UnknownHostException {
+		return createLeaderRetrievalService(configuration, resolveInitialHostName, null);
 	}
 
 	/**
@@ -55,30 +70,35 @@ public final class StandaloneUtils {
 	 * for the remote Akka URL.
 	 *
 	 * @param configuration Configuration instance containing hte host and port information
+	 * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
 	 * @param jobManagerName Name of the JobManager actor
 	 * @return StandaloneLeaderRetrievalService
 	 * @throws UnknownHostException if the host name cannot be resolved into an {@link InetAddress}
 	 */
 	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration,
+			boolean resolveInitialHostName,
 			String jobManagerName)
 		throws UnknownHostException {
 
-
 		Tuple3<String, String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration);
 
 		String protocol = stringIntPair._1();
 		String jobManagerHostname = stringIntPair._2();
 		int jobManagerPort = (Integer) stringIntPair._3();
-		InetSocketAddress hostPort;
 
-		try {
-			InetAddress inetAddress = InetAddress.getByName(jobManagerHostname);
-			hostPort = new InetSocketAddress(inetAddress, jobManagerPort);
-		}
-		catch (UnknownHostException e) {
-			throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
+		// Do not try to resolve a hostname to prevent resolving to the wrong IP address
+		String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(jobManagerHostname, jobManagerPort);
+
+		if (resolveInitialHostName) {
+			try {
+				//noinspection ResultOfMethodCallIgnored
+				InetAddress.getByName(jobManagerHostname);
+			}
+			catch (UnknownHostException e) {
+				throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
 					+ "' specified in the configuration");
+			}
 		}
 
 		String jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 6a23c39..c7bea66 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Callable, TimeUnit}
 
 import akka.actor._
 import akka.pattern.{ask => akkaAsk}
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
+import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
 import org.apache.flink.runtime.net.SSLUtils
@@ -63,8 +63,8 @@ object AkkaUtils {
    * will be instantiated.
    *
    * @param configuration instance containing the user provided configuration values
-   * @param listeningAddress an optional tuple containing a hostname and a port to bind to. If the
-   *                         parameter is None, then a local actor system will be created.
+   * @param listeningAddress an optional tuple containing a bindAddress and a port to bind to.
+    *                        If the parameter is None, then a local actor system will be created.
    * @return created actor system
    */
   def createActorSystem(
@@ -102,21 +102,25 @@ object AkkaUtils {
    * specified, then the actor system will listen on the respective address.
    *
    * @param configuration instance containing the user provided configuration values
-   * @param listeningAddress optional tuple of hostname and port to listen on. If None is given,
-   *                         then an Akka config for local actor system will be returned
+   * @param externalAddress optional tuple of bindAddress and port to be reachable at.
+   *                        If None is given, then an Akka config for local actor system
+   *                        will be returned
    * @return Akka config
    */
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
-                    listeningAddress: Option[(String, Int)]): Config = {
+                    externalAddress: Option[(String, Int)]): Config = {
     val defaultConfig = getBasicAkkaConfig(configuration)
 
-    listeningAddress match {
+    externalAddress match {
 
       case Some((hostname, port)) =>
-        val ipAddress = InetAddress.getByName(hostname)
-        val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\""
-        val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port)
+
+        val remoteConfig = getRemoteAkkaConfig(configuration,
+          // the wildcard IP lets us bind to all network interfaces
+          NetUtils.getWildcardIPAddress, port,
+          hostname, port)
+
         remoteConfig.withFallback(defaultConfig)
 
       case None =>
@@ -213,15 +217,19 @@ object AkkaUtils {
 
   /**
    * Creates a Akka config for a remote actor system listening on port on the network interface
-   * identified by hostname.
+   * identified by bindAddress.
    *
    * @param configuration instance containing the user provided configuration values
-   * @param hostname of the network interface to listen on
+   * @param bindAddress of the network interface to bind on
    * @param port to bind to or if 0 then Akka picks a free port automatically
+   * @param externalHostname The host name to expect for Akka messages
+   * @param externalPort The port to expect for Akka messages
    * @return Flink's Akka configuration for remote actor systems
    */
   private def getRemoteAkkaConfig(configuration: Configuration,
-                                  hostname: String, port: Int): Config = {
+                                  bindAddress: String, port: Int,
+                                  externalHostname: String, externalPort: Int): Config = {
+
     val akkaAskTimeout = Duration(configuration.getString(
       ConfigConstants.AKKA_ASK_TIMEOUT,
       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
@@ -322,7 +330,8 @@ object AkkaUtils {
          |    netty {
          |      tcp {
          |        transport-class = "akka.remote.transport.netty.NettyTransport"
-         |        port = $port
+         |        port = $externalPort
+         |        bind-port = $port
          |        connection-timeout = $akkaTCPTimeout
          |        maximum-frame-size = $akkaFramesize
          |        tcp-nodelay = on
@@ -334,24 +343,29 @@ object AkkaUtils {
          |}
        """.stripMargin
 
-      val hostnameConfigString = if(hostname != null && hostname.nonEmpty){
-        s"""
-           |akka {
-           |  remote {
-           |    netty {
-           |      tcp {
-           |        hostname = $hostname
-           |      }
-           |    }
-           |  }
-           |}
-         """.stripMargin
-      }else{
-        // if hostname is null or empty, then leave hostname unspecified. Akka will pick
+    val effectiveHostname =
+      if (externalHostname != null && externalHostname.nonEmpty) {
+        externalHostname
+      } else {
+        // if bindAddress is null or empty, then leave bindAddress unspecified. Akka will pick
         // InetAddress.getLocalHost.getHostAddress
-        ""
+        "\"\""
       }
 
+    val hostnameConfigString =
+      s"""
+         |akka {
+         |  remote {
+         |    netty {
+         |      tcp {
+         |        hostname = $effectiveHostname
+         |        bind-hostname = $bindAddress
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin
+
     val sslConfigString = if (akkaEnableSSLConfig) {
       s"""
          |akka {

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8c686cd..c5682e2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
@@ -149,7 +149,7 @@ class JobManager(
     case Some(registry) =>
       val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
       Option(new JobManagerMetricGroup(
-        registry, NetUtils.ipAddressToUrlString(InetAddress.getByName(host))))
+        registry, NetUtils.unresolvedHostToNormalizedString(host)))
     case None =>
       log.warn("Could not instantiate JobManager metrics.")
       None
@@ -1923,8 +1923,8 @@ object JobManager {
     // parsing the command line arguments
     val (configuration: Configuration,
          executionMode: JobManagerMode,
-         listeningHost: String,
-         listeningPortRange: java.util.Iterator[Integer]) =
+         externalHostName: String,
+         portRange: java.util.Iterator[Integer]) =
     try {
       parseArgs(args)
     }
@@ -1939,14 +1939,14 @@ object JobManager {
     // we want to check that the JobManager hostname is in the config
     // if it is not in there, the actor system will bind to the loopback interface's
     // address and will not be reachable from anyone remote
-    if (listeningHost == null) {
+    if (externalHostName == null) {
       val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
         "' is missing (hostname/address to bind JobManager to)."
       LOG.error(message)
       System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
 
-    if (!listeningPortRange.hasNext) {
+    if (!portRange.hasNext) {
       if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
         val message = "Config parameter '" + ConfigConstants.HA_JOB_MANAGER_PORT +
           "' does not specify a valid port range."
@@ -1954,7 +1954,7 @@ object JobManager {
         System.exit(STARTUP_FAILURE_RETURN_CODE)
       }
       else {
-        val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+        val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
           "' does not specify a valid port."
         LOG.error(message)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -1970,8 +1970,8 @@ object JobManager {
           runJobManager(
             configuration,
             executionMode,
-            listeningHost,
-            listeningPortRange)
+            externalHostName,
+            portRange)
         }
       })
     } catch {
@@ -2080,7 +2080,7 @@ object JobManager {
           override def createSocket(port: Int): ServerSocket = new ServerSocket(
             // Use the correct listening address, bound ports will only be
             // detected later by Akka.
-            port, 0, InetAddress.getByName(listeningAddress))
+            port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
         })
 
       val port =
@@ -2158,8 +2158,8 @@ object JobManager {
     * @param configuration The configuration object for the JobManager
     * @param executionMode The execution mode in which to run. Execution mode LOCAL with spawn an
     *                      additional TaskManager in the same process.
-    * @param listeningAddress The hostname where the JobManager should listen for messages.
-    * @param listeningPort The port where the JobManager should listen for messages
+    * @param externalHostname The hostname where the JobManager is reachable for rpc communication
+    * @param port The port where the JobManager is reachable for rpc communication
     * @param futureExecutor to run the JobManager's futures
     * @param ioExecutor to run blocking io operations
     * @param jobManagerClass The class of the JobManager to be started
@@ -2171,8 +2171,8 @@ object JobManager {
   def startActorSystemAndJobManagerActors(
       configuration: Configuration,
       executionMode: JobManagerMode,
-      listeningAddress: String,
-      listeningPort: Int,
+      externalHostname: String,
+      port: Int,
       futureExecutor: Executor,
       ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
@@ -2180,16 +2180,15 @@ object JobManager {
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
     : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
 
-    LOG.info("Starting JobManager")
+    val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(externalHostname, port)
 
     // Bring up the job manager actor system first, bind it to the given address.
-    val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
-    LOG.info(s"Starting JobManager actor system at $hostPortUrl")
+    LOG.info(s"Starting JobManager actor system reachable at $hostPort")
 
     val jobManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(
         configuration,
-        Some((listeningAddress, listeningPort))
+        Some((externalHostname, port))
       )
       if (LOG.isDebugEnabled) {
         LOG.debug("Using akka configuration\n " + akkaConfig)
@@ -2201,8 +2200,7 @@ object JobManager {
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
-            val address = listeningAddress + ":" + listeningPort
-            throw new Exception("Unable to create JobManager at address " + address +
+            throw new Exception("Unable to create JobManager at address " + hostPort +
               " - " + cause.getMessage(), t)
           }
         }
@@ -2267,7 +2265,7 @@ object JobManager {
           configuration,
           ResourceID.generate(),
           jobManagerSystem,
-          listeningAddress,
+          externalHostname,
           Some(TaskManager.TASK_MANAGER_NAME),
           None,
           localTaskManagerCommunication = true,
@@ -2360,17 +2358,17 @@ object JobManager {
       }
     }
 
-    val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
+    val cliOptions = parser.parse(args, new JobManagerCliOptions()).getOrElse {
       throw new Exception(
         s"Invalid command line arguments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
     
-    val configDir = config.getConfigDir()
+    val configDir = cliOptions.getConfigDir()
     
     if (configDir == null) {
       throw new Exception("Missing parameter '--configDir'")
     }
-    if (config.getJobManagerMode() == null) {
+    if (cliOptions.getJobManagerMode() == null) {
       throw new Exception("Missing parameter '--executionMode'")
     }
 
@@ -2391,12 +2389,12 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
     }
 
-    if (config.getWebUIPort() >= 0) {
-      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, config.getWebUIPort())
+    if (cliOptions.getWebUIPort() >= 0) {
+      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, cliOptions.getWebUIPort())
     }
 
-    if (config.getHost() != null) {
-      configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, config.getHost())
+    if (cliOptions.getHost() != null) {
+      configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, cliOptions.getHost())
     }
 
     val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
@@ -2429,10 +2427,9 @@ object JobManager {
         String.valueOf(listeningPort)
       }
 
-    val executionMode = config.getJobManagerMode
-    val hostUrl = NetUtils.ipAddressToUrlString(InetAddress.getByName(host))
+    val executionMode = cliOptions.getJobManagerMode
 
-    LOG.info(s"Starting JobManager on $hostUrl:$portRange with execution mode $executionMode")
+    LOG.info(s"Starting JobManager on $host:$portRange with execution mode $executionMode")
 
     val portRangeIterator = NetUtils.getPortRangeFromString(portRange)
 
@@ -2735,20 +2732,18 @@ object JobManager {
    * where the JobManager's actor system runs.
    *
    * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
-   * @param address The address of the JobManager's actor system.
+   * @param hostPort The external address of the JobManager's actor system in format host:port
    * @return The akka URL of the JobManager actor.
    */
   def getRemoteJobManagerAkkaURL(
       protocol: String,
-      address: InetSocketAddress,
+      hostPort: String,
       name: Option[String] = None)
     : String = {
 
     require(protocol == "akka.tcp" || protocol == "akka.ssl.tcp",
         "protocol field should be either akka.tcp or akka.ssl.tcp")
 
-    val hostPort = NetUtils.socketAddressToUrlString(address)
-
     getJobManagerAkkaURLHelper(s"$protocol://flink@$hostPort", name)
   }
 
@@ -2761,17 +2756,7 @@ object JobManager {
   def getRemoteJobManagerAkkaURL(config: Configuration) : String = {
     val (protocol, hostname, port) = TaskManager.getAndCheckJobManagerAddress(config)
 
-    var hostPort: InetSocketAddress = null
-
-    try {
-      val inetAddress: InetAddress = InetAddress.getByName(hostname)
-      hostPort = new InetSocketAddress(inetAddress, port)
-    }
-    catch {
-      case e: UnknownHostException =>
-        throw new UnknownHostException(s"Cannot resolve the JobManager hostname '$hostname' " +
-          s"specified in the configuration")
-    }
+    val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)
 
     JobManager.getRemoteJobManagerAkkaURL(protocol, hostPort, Option.empty)
   }
@@ -2794,15 +2779,6 @@ object JobManager {
     address + "/user/" + name.getOrElse(JOB_MANAGER_NAME)
   }
 
-  def getJobManagerActorRefFuture(
-      protocol: String,
-      address: InetSocketAddress,
-      system: ActorSystem,
-      timeout: FiniteDuration)
-    : Future[ActorRef] = {
-    AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(protocol, address), system, timeout)
-  }
-
   /**
    * Resolves the JobManager actor reference in a blocking fashion.
    *
@@ -2825,7 +2801,7 @@ object JobManager {
    * Resolves the JobManager actor reference in a blocking fashion.
    *
    * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
-   * @param address The socket address of the JobManager's actor system.
+   * @param hostPort The external address of the JobManager's actor system in format host:port.
    * @param system The local actor system that should perform the lookup.
    * @param timeout The maximum time to wait until the lookup fails.
    * @throws java.io.IOException Thrown, if the lookup fails.
@@ -2834,19 +2810,19 @@ object JobManager {
   @throws(classOf[IOException])
   def getJobManagerActorRef(
       protocol: String,
-      address: InetSocketAddress,
+      hostPort: String,
       system: ActorSystem,
       timeout: FiniteDuration)
     : ActorRef = {
 
-    val jmAddress = getRemoteJobManagerAkkaURL(protocol, address)
+    val jmAddress = getRemoteJobManagerAkkaURL(protocol, hostPort)
     getJobManagerActorRef(jmAddress, system, timeout)
   }
 
   /**
    * Resolves the JobManager actor reference in a blocking fashion.
    *
-   * @param address The socket address of the JobManager's actor system.
+   * @param hostPort The address of the JobManager's actor system in format host:port.
    * @param system The local actor system that should perform the lookup.
    * @param config The config describing the maximum time to wait until the lookup fails.
    * @throws java.io.IOException Thrown, if the lookup fails.
@@ -2854,13 +2830,13 @@ object JobManager {
    */
   @throws(classOf[IOException])
   def getJobManagerActorRef(
-      address: InetSocketAddress,
+      hostPort: String,
       system: ActorSystem,
       config: Configuration)
     : ActorRef = {
 
     val timeout = AkkaUtils.getLookupTimeout(config)
     val protocol = AkkaUtils.getAkkaProtocol(config)
-    getJobManagerActorRef(protocol, address, system, timeout)
+    getJobManagerActorRef(protocol, hostPort, system, timeout)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index dc59048..88d7b3a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
   // not getLocalHost(), which may be 127.0.1.1
   val hostname = userConfiguration.getString(
     ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
-    InetAddress.getByName("localhost").getHostAddress())
+    "localhost")
 
   protected val originalConfiguration = generateConfiguration(userConfiguration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 41d3077..ec5ff3d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2089,9 +2089,12 @@ object TaskManager {
 
     val leaderRetrievalService = leaderRetrievalServiceOption match {
       case Some(lrs) => lrs
-      case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+      case None =>
+        // validate the address if possible (e.g. we're in Standalone mode)
+        LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true)
     }
 
+
     val metricsRegistry = new FlinkMetricRegistry(
       MetricRegistryConfiguration.fromConfiguration(configuration))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index fbe6e8f..49c28e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -29,6 +29,7 @@ import akka.actor.PoisonPill;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 
 import org.apache.flink.configuration.Configuration;
@@ -40,7 +41,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
-import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -119,7 +119,7 @@ public class JobManagerProcessReapingTest {
 				try {
 					jobManagerRef = JobManager.getJobManagerActorRef(
 						"akka.tcp",
-						new InetSocketAddress("localhost", jobManagerPort),
+						NetUtils.unresolvedHostAndPortToNormalizedString("localhost", jobManagerPort),
 						localSystem, new FiniteDuration(25, TimeUnit.SECONDS));
 				} catch (Throwable t) {
 					// job manager probably not ready yet

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 58761cb..0ab1b67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -82,7 +82,7 @@ public class JobManagerStartupTest {
 		
 		try {
 			portNum = NetUtils.getAvailablePort();
-			portOccupier = new ServerSocket(portNum, 10, InetAddress.getByName("localhost"));
+			portOccupier = new ServerSocket(portNum, 10, InetAddress.getByName("0.0.0.0"));
 		}
 		catch (Throwable t) {
 			// could not find free port, or open a connection there
@@ -95,7 +95,7 @@ public class JobManagerStartupTest {
 		}
 		catch (Exception e) {
 			// expected
-			List<Throwable> causes = StartupUtils.getExceptionCauses(e,new ArrayList<Throwable>());
+			List<Throwable> causes = StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>());
 			for(Throwable cause:causes) {
 				if(cause instanceof BindException) {
 					throw (BindException) cause;

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 8b8987b..6a8ff17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -103,10 +104,10 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			client[0] = ZooKeeperUtils.startCuratorFramework(config);
 			client[1] = ZooKeeperUtils.startCuratorFramework(config);
 
-			InetSocketAddress wrongInetSocketAddress = new InetSocketAddress(InetAddress.getByName("1.1.1.1"), 1234);
+			String wrongHostPort = NetUtils.unresolvedHostAndPortToNormalizedString("1.1.1.1", 1234);
 
 			String wrongAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
-					wrongInetSocketAddress, Option.<String>empty());
+					wrongHostPort, Option.<String>empty());
 
 			try {
 				localHost = InetAddress.getLocalHost();
@@ -123,9 +124,10 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			}
 
 			InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
+			String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), correctInetSocketAddress.getPort());
 
 			String correctAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
-					correctInetSocketAddress, Option.<String>empty());
+				hostPort, Option.<String>empty());
 
 			faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
 			TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index dead732..385d1ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -40,7 +40,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
-import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
@@ -91,11 +90,10 @@ public abstract class TaskManagerProcessReapingTestBase {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			final InetAddress localhost = InetAddress.getByName("localhost");
 			final int jobManagerPort = NetUtils.getAvailablePort();
 
 			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
 			jmActorSystem = AkkaUtils.createActorSystem(
 					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
 
@@ -137,7 +135,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 			// grab the reference to the TaskManager. try multiple times, until the process
 			// is started and the TaskManager is up
 			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
-					org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+					"localhost:" + taskManagerPort,
 					TaskManager.TASK_MANAGER_NAME());
 
 			ActorRef taskManagerRef = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 686de76..b2a905d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.StartupUtils;
+import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 import scala.Option;
 
@@ -57,10 +58,10 @@ public class TaskManagerStartupTest {
 		ServerSocket blocker = null;
 		try {
 			final String localHostName = "localhost";
-			final InetAddress localAddress = InetAddress.getByName(localHostName);
+			final InetAddress localBindAddress = InetAddress.getByName(NetUtils.getWildcardIPAddress());
 
 			// block some port
-			blocker = new ServerSocket(0, 50, localAddress);
+			blocker = new ServerSocket(0, 50, localBindAddress);
 			final int port = blocker.getLocalPort();
 
 			TaskManager.runTaskManager(localHostName, ResourceID.generate(), port, new Configuration(),
@@ -69,7 +70,7 @@ public class TaskManagerStartupTest {
 
 		}
 		catch (IOException e) {
-			// expected. validate the error messagex
+			// expected. validate the error message
 			List<Throwable> causes = StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>());
 			for (Throwable cause : causes) {
 				if (cause instanceof BindException) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 387b0fd..48c65c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -25,13 +25,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.JobManagerMode;
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -148,7 +148,7 @@ public class JobManagerProcess extends TestJvmProcess {
 
 		return JobManager.getRemoteJobManagerAkkaURL(
 				AkkaUtils.getAkkaProtocol(config),
-				new InetSocketAddress("localhost", port),
+				NetUtils.unresolvedHostAndPortToNormalizedString("localhost", port),
 				Option.<String>empty());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index a18024f..0daac2e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.akka
 import java.net.InetSocketAddress
 
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.util.NetUtils
 import org.junit.runner.RunWith
-import org.scalatest.{FunSuite, BeforeAndAfterAll, Matchers}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 import org.scalatest.junit.JUnitRunner
 
 @RunWith(classOf[JUnitRunner])
@@ -39,7 +40,7 @@ class AkkaUtilsTest
 
     val remoteAkkaURL = JobManager.getRemoteJobManagerAkkaURL(
       "akka.tcp",
-      address,
+      NetUtils.unresolvedHostAndPortToNormalizedString(host, port),
       Some("actor"))
 
     val result = AkkaUtils.getInetSockeAddressFromAkkaURL(remoteAkkaURL)

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 6013309..1489fb2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -53,7 +53,7 @@ class JobManagerConnectionTest {
         case _ : Throwable => return
       }
 
-      val endpoint = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), freePort)
+      val endpoint = NetUtils.unresolvedHostAndPortToNormalizedString("127.0.0.1", freePort)
       val config = createConfigWithLowTimeout()
 
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
@@ -89,7 +89,7 @@ class JobManagerConnectionTest {
 
     try {
       // some address that is not running a JobManager
-      val endpoint = new InetSocketAddress(InetAddress.getByName("10.254.254.254"), 2)
+      val endpoint = NetUtils.unresolvedHostAndPortToNormalizedString("10.254.254.254", 2)
       val config = createConfigWithLowTimeout()
 
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index efc95ab..7929e27 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -168,8 +168,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 		</dependency>
 		
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 3c2bc67..6baf5a8 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -99,8 +99,8 @@ under the License.
 		</dependency>
 		
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 72a7122..d959e14 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -90,7 +90,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 	/**
 	 * Tests that the application master can be killed multiple times and that the surviving
-	 * TaskManager succesfully reconnects to the newly started JobManager.
+	 * TaskManager successfully reconnects to the newly started JobManager.
 	 * @throws Exception
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 9a3cc8e..39d9379 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -65,18 +65,18 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-camel_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-camel_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
@@ -86,8 +86,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 670f8a2..5606719 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -625,7 +625,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			}
 			//------------------ ClusterClient deployed, handle connection details
 			String jobManagerAddress =
-				yarnCluster.getJobManagerAddress().getAddress().getHostAddress() +
+				yarnCluster.getJobManagerAddress().getAddress().getHostName() +
 					":" + yarnCluster.getJobManagerAddress().getPort();
 
 			System.out.println("Flink JobManager is now running on " + jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 04ba726..0842517 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,7 +93,7 @@ under the License.
 		<log4j.configuration>log4j-test.properties</log4j.configuration>
 		<slf4j.version>1.7.7</slf4j.version>
 		<guava.version>18.0</guava.version>
-		<akka.version>2.3.7</akka.version>
+		<akka.version>2.3-custom</akka.version>
 		<java.version>1.7</java.version>
 		<scala.macros.version>2.0.1</scala.macros.version>
 		<!-- Default scala versions, may be overwritten by build profiles -->
@@ -334,32 +334,32 @@ under the License.
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-actor_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-remote_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-slf4j_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-camel_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-camel_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 				<scope>test</scope>
 			</dependency>
@@ -499,8 +499,8 @@ under the License.
 							com.sun.tools.javac.code.Symbol$CompletionFailure:
 								class file for akka.testkit.TestKit not found"
 					-->
-					<groupId>com.typesafe.akka</groupId>
-					<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+					<groupId>com.data-artisans</groupId>
+					<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 					<version>${akka.version}</version>
 					<scope>provided</scope>
 				</dependency>
@@ -627,8 +627,8 @@ under the License.
 							com.sun.tools.javac.code.Symbol$CompletionFailure:
 								class file for akka.testkit.TestKit not found"
 					-->
-					<groupId>com.typesafe.akka</groupId>
-					<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+					<groupId>com.data-artisans</groupId>
+					<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 					<version>${akka.version}</version>
 					<scope>provided</scope>
 				</dependency>