You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/16 17:00:34 UTC

[1/7] flink git commit: [FLINK-5344] Fixed the dockerized doc build, which has been broken for a while. Fixed the -p option. Reverted the main Gemfile back to ruby 1.9 to make the build bot happy, and created a new Gemfile in ruby2/Gemfile to keep the in

Repository: flink
Updated Branches:
  refs/heads/master 67c4be648 -> 4a27d2105


[FLINK-5344] Fixed the dockerized doc build, which has been broken for a while. Fixed the -p option. Reverted the main Gemfile back to ruby 1.9 to make the build bot happy, and created a new Gemfile in ruby2/Gemfile to keep the incremental build option available.

This closes #3016.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a27d210
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a27d210
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a27d210

Branch: refs/heads/master
Commit: 4a27d2105dd08f323c0be26e79a55986aa97e7bd
Parents: 27ebdf7
Author: David Anderson <da...@alpinegizmo.com>
Authored: Fri Dec 16 08:49:57 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 .gitignore              |  2 ++
 docs/Gemfile            | 14 ++++-----
 docs/Gemfile.lock       | 74 ++++++++++++++++++++++++++++----------------
 docs/README.md          | 10 +++++-
 docs/build_docs.sh      |  7 +++--
 docs/ruby2/Gemfile      | 30 ++++++++++++++++++
 docs/ruby2/Gemfile.lock | 74 ++++++++++++++++++++++++++++++++++++++++++++
 pom.xml                 |  1 +
 8 files changed, 175 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index db3e2ea..1b9c64e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,5 +27,7 @@ out/
 /docs/content
 /docs/.bundle
 /docs/.rubydeps
+/docs/ruby2/.bundle
+/docs/ruby2/.rubydeps
 *.ipr
 *.iws

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/docs/Gemfile
----------------------------------------------------------------------
diff --git a/docs/Gemfile b/docs/Gemfile
index c1a1555..ce2f435 100644
--- a/docs/Gemfile
+++ b/docs/Gemfile
@@ -17,13 +17,13 @@
 ################################################################################
 
 source 'https://rubygems.org'
-ruby '~> 2.3.0'
 
-# Dependencies required to build the Flink docs
-gem 'jekyll', '~> 3.3.0'
-gem 'kramdown', '~> 1.13.0'
+ruby '>= 1.9.0'
+
+gem 'jekyll', '2.5.3'
+gem 'kramdown', '1.10.0'
+gem 'addressable', '2.4.0'
+gem 'octokit', '~> 4.3.0'
 gem 'pygments.rb', '0.6.3'
 gem 'therubyracer', '0.12.2'
-group :jekyll_plugins do
-  gem 'hawkins'
-end
+gem 'json'

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/docs/Gemfile.lock
----------------------------------------------------------------------
diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock
index da7e04a..bcd107b 100644
--- a/docs/Gemfile.lock
+++ b/docs/Gemfile.lock
@@ -1,67 +1,87 @@
 GEM
   remote: https://rubygems.org/
   specs:
-    addressable (2.5.0)
-      public_suffix (~> 2.0, >= 2.0.2)
-    colorator (1.1.0)
-    em-websocket (0.5.1)
-      eventmachine (>= 0.12.9)
-      http_parser.rb (~> 0.6.0)
-    eventmachine (1.2.1)
+    addressable (2.4.0)
+    blankslate (2.1.2.4)
+    classifier-reborn (2.0.4)
+      fast-stemmer (~> 1.0)
+    coffee-script (2.4.1)
+      coffee-script-source
+      execjs
+    coffee-script-source (1.11.1)
+    colorator (0.1)
+    execjs (2.7.0)
+    faraday (0.9.2)
+      multipart-post (>= 1.2, < 3)
+    fast-stemmer (1.0.2)
     ffi (1.9.14)
-    forwardable-extended (2.6.0)
-    hawkins (2.0.4)
-      em-websocket (~> 0.5)
-      jekyll (~> 3.1)
-    http_parser.rb (0.6.0)
-    jekyll (3.3.1)
-      addressable (~> 2.4)
-      colorator (~> 1.0)
+    jekyll (2.5.3)
+      classifier-reborn (~> 2.0)
+      colorator (~> 0.1)
+      jekyll-coffeescript (~> 1.0)
+      jekyll-gist (~> 1.0)
+      jekyll-paginate (~> 1.0)
       jekyll-sass-converter (~> 1.0)
       jekyll-watch (~> 1.1)
       kramdown (~> 1.3)
-      liquid (~> 3.0)
+      liquid (~> 2.6.1)
       mercenary (~> 0.3.3)
-      pathutil (~> 0.9)
-      rouge (~> 1.7)
+      pygments.rb (~> 0.6.0)
+      redcarpet (~> 3.1)
       safe_yaml (~> 1.0)
+      toml (~> 0.1.0)
+    jekyll-coffeescript (1.0.1)
+      coffee-script (~> 2.2)
+    jekyll-gist (1.4.0)
+      octokit (~> 4.3.0)
+    jekyll-paginate (1.1.0)
     jekyll-sass-converter (1.5.0)
       sass (~> 3.4)
     jekyll-watch (1.5.0)
       listen (~> 3.0, < 3.1)
-    kramdown (1.13.1)
+    json (1.8.3)
+    kramdown (1.10.0)
     libv8 (3.16.14.17)
-    liquid (3.0.6)
+    liquid (2.6.3)
     listen (3.0.8)
       rb-fsevent (~> 0.9, >= 0.9.4)
       rb-inotify (~> 0.9, >= 0.9.7)
     mercenary (0.3.6)
-    pathutil (0.14.0)
-      forwardable-extended (~> 2.6)
+    multipart-post (2.0.0)
+    octokit (4.3.0)
+      sawyer (~> 0.7.0, >= 0.5.3)
+    parslet (1.5.0)
+      blankslate (~> 2.0)
     posix-spawn (0.3.12)
-    public_suffix (2.0.4)
     pygments.rb (0.6.3)
       posix-spawn (~> 0.3.6)
       yajl-ruby (~> 1.2.0)
     rb-fsevent (0.9.8)
     rb-inotify (0.9.7)
       ffi (>= 0.5.0)
+    redcarpet (3.3.4)
     ref (2.0.0)
-    rouge (1.11.1)
     safe_yaml (1.0.4)
     sass (3.4.22)
+    sawyer (0.7.0)
+      addressable (>= 2.3.5, < 2.5)
+      faraday (~> 0.8, < 0.10)
     therubyracer (0.12.2)
       libv8 (~> 3.16.14.0)
       ref
+    toml (0.1.2)
+      parslet (~> 1.5.0)
     yajl-ruby (1.2.1)
 
 PLATFORMS
   ruby
 
 DEPENDENCIES
-  hawkins
-  jekyll (~> 3.3.0)
-  kramdown (~> 1.13.0)
+  addressable (= 2.4.0)
+  jekyll (= 2.5.3)
+  json
+  kramdown (= 1.10.0)
+  octokit (~> 4.3.0)
   pygments.rb (= 0.6.3)
   therubyracer (= 0.12.2)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 879c33b..d243b04 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -39,7 +39,15 @@ to `docs/content/index.html` and start reading.
 
 If you call the script with the preview flag `build_docs.sh -p`, Jekyll will
 start a web server at `localhost:4000` and watch the docs directory for
-updates. Use this mode to preview changes locally.
+updates. Use this mode to preview changes locally. 
+
+If you have ruby 2.0 or greater, 
+you can call the script with the incremental flag `build_docs.sh -i`.
+Jekyll will then serve a live preview at `localhost:4000`,
+and it will be much faster because it will only rebuild the pages corresponding
+to files that are modified. Note that if you are making changes that affect
+the sidebar navigation, you'll have to build the entire site to see
+those changes reflected on every page.
 
 # Contribute
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/docs/build_docs.sh
----------------------------------------------------------------------
diff --git a/docs/build_docs.sh b/docs/build_docs.sh
index df83ac4..cbdd1d4 100755
--- a/docs/build_docs.sh
+++ b/docs/build_docs.sh
@@ -46,13 +46,16 @@ DOCS_DST=${DOCS_SRC}/content
 JEKYLL_CMD="build"
 
 # if -p flag is provided, serve site on localhost
-# -i is like -p, but incremental (which has some issues, but is very fast)
-while getopts ":p:i" opt; do
+# -i is like -p, but incremental (only rebuilds the modified file)
+while getopts "pi" opt; do
 	case $opt in
 		p)
 		JEKYLL_CMD="serve --baseurl= --watch"
 		;;
 		i)
+		[[ `ruby -v` =~ 'ruby 1' ]] && echo "Error: building the docs with the incremental option requires at least ruby 2.0" && exit 1
+		cd ruby2
+		bundle install --path .rubydeps
 		JEKYLL_CMD="liveserve --baseurl= --watch --incremental"
 		;;
 	esac

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/docs/ruby2/Gemfile
----------------------------------------------------------------------
diff --git a/docs/ruby2/Gemfile b/docs/ruby2/Gemfile
new file mode 100644
index 0000000..34afb1c
--- /dev/null
+++ b/docs/ruby2/Gemfile
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source 'https://rubygems.org'
+
+ruby '~> 2'
+
+gem 'jekyll', '~> 3.3.0'
+gem 'kramdown', '~> 1.13.0'
+gem 'json'
+group :jekyll_plugins do
+  gem 'hawkins'
+end
+gem 'pygments.rb', '0.6.3'
+gem 'therubyracer', '0.12.2'

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/docs/ruby2/Gemfile.lock
----------------------------------------------------------------------
diff --git a/docs/ruby2/Gemfile.lock b/docs/ruby2/Gemfile.lock
new file mode 100644
index 0000000..e17954d
--- /dev/null
+++ b/docs/ruby2/Gemfile.lock
@@ -0,0 +1,74 @@
+GEM
+  remote: https://rubygems.org/
+  specs:
+    addressable (2.5.0)
+      public_suffix (~> 2.0, >= 2.0.2)
+    colorator (1.1.0)
+    em-websocket (0.5.1)
+      eventmachine (>= 0.12.9)
+      http_parser.rb (~> 0.6.0)
+    eventmachine (1.2.1)
+    ffi (1.9.14)
+    forwardable-extended (2.6.0)
+    hawkins (2.0.4)
+      em-websocket (~> 0.5)
+      jekyll (~> 3.1)
+    http_parser.rb (0.6.0)
+    jekyll (3.3.1)
+      addressable (~> 2.4)
+      colorator (~> 1.0)
+      jekyll-sass-converter (~> 1.0)
+      jekyll-watch (~> 1.1)
+      kramdown (~> 1.3)
+      liquid (~> 3.0)
+      mercenary (~> 0.3.3)
+      pathutil (~> 0.9)
+      rouge (~> 1.7)
+      safe_yaml (~> 1.0)
+    jekyll-sass-converter (1.5.0)
+      sass (~> 3.4)
+    jekyll-watch (1.5.0)
+      listen (~> 3.0, < 3.1)
+    json (2.0.2)
+    kramdown (1.13.1)
+    libv8 (3.16.14.17)
+    liquid (3.0.6)
+    listen (3.0.8)
+      rb-fsevent (~> 0.9, >= 0.9.4)
+      rb-inotify (~> 0.9, >= 0.9.7)
+    mercenary (0.3.6)
+    pathutil (0.14.0)
+      forwardable-extended (~> 2.6)
+    posix-spawn (0.3.12)
+    public_suffix (2.0.4)
+    pygments.rb (0.6.3)
+      posix-spawn (~> 0.3.6)
+      yajl-ruby (~> 1.2.0)
+    rb-fsevent (0.9.8)
+    rb-inotify (0.9.7)
+      ffi (>= 0.5.0)
+    ref (2.0.0)
+    rouge (1.11.1)
+    safe_yaml (1.0.4)
+    sass (3.4.22)
+    therubyracer (0.12.2)
+      libv8 (~> 3.16.14.0)
+      ref
+    yajl-ruby (1.2.1)
+
+PLATFORMS
+  ruby
+
+DEPENDENCIES
+  hawkins
+  jekyll (~> 3.3.0)
+  json
+  kramdown (~> 1.13.0)
+  pygments.rb (= 0.6.3)
+  therubyracer (= 0.12.2)
+
+RUBY VERSION
+   ruby 2.3.0p0
+
+BUNDLED WITH
+   1.13.6

http://git-wip-us.apache.org/repos/asf/flink/blob/4a27d210/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0842517..9676976 100644
--- a/pom.xml
+++ b/pom.xml
@@ -815,6 +815,7 @@ under the License.
 						<!-- External web libraries. -->
 						<exclude>docs/**/bootstrap*</exclude>
 						<exclude>docs/Gemfile.lock</exclude>
+						<exclude>docs/ruby2/Gemfile.lock</exclude>
 						<exclude>docs/img/*.svg</exclude>
 						<exclude>**/docs/page/font-awesome/**</exclude>
 						<exclude>**/resources/**/font-awesome/**</exclude>


[4/7] flink git commit: [docker] improve Dockerfile host configuration

Posted by mx...@apache.org.
[docker] improve Dockerfile host configuration

- configure job manager address for both operation modes
- introduce argument to specify the external job manager address
- replace ARG with ENV for backwards-compatibility
- EXPOSE web port and RPC port

This closes #2981.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e1e1395
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e1e1395
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e1e1395

Branch: refs/heads/master
Commit: 6e1e13958251af18555193160d278c6ffcdd0539
Parents: 67c4be6
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Dec 9 17:58:30 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 flink-contrib/docker-flink/Dockerfile           | 13 +++++++++----
 .../docker-flink/docker-compose-bluemix.yml     |  2 ++
 flink-contrib/docker-flink/docker-compose.yml   |  4 ++++
 flink-contrib/docker-flink/docker-entrypoint.sh | 20 +++++++++++---------
 4 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e1e1395/flink-contrib/docker-flink/Dockerfile
----------------------------------------------------------------------
diff --git a/flink-contrib/docker-flink/Dockerfile b/flink-contrib/docker-flink/Dockerfile
index d5a4432..ce5271dc 100644
--- a/flink-contrib/docker-flink/Dockerfile
+++ b/flink-contrib/docker-flink/Dockerfile
@@ -22,15 +22,20 @@ FROM java:8-jre-alpine
 RUN apk add --no-cache bash snappy
 
 # Configure Flink version
-ARG FLINK_VERSION=1.1.1
-ARG HADOOP_VERSION=27
-ARG SCALA_VERSION=2.11
+ENV FLINK_VERSION=1.1.1
+ENV HADOOP_VERSION=27
+ENV SCALA_VERSION=2.11
 
 # Flink environment variables
-ARG FLINK_INSTALL_PATH=/opt
+ENV FLINK_INSTALL_PATH=/opt
 ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
 ENV PATH $PATH:$FLINK_HOME/bin
 
+# These can be mapped from the host to the container using
+# $ docker run -t flink -p 8081:8081 -p 6123:6123 jobmanager
+EXPOSE 8081
+EXPOSE 6123
+
 # Install build dependencies and flink
 RUN set -x && \
   mkdir -p $FLINK_INSTALL_PATH && \

http://git-wip-us.apache.org/repos/asf/flink/blob/6e1e1395/flink-contrib/docker-flink/docker-compose-bluemix.yml
----------------------------------------------------------------------
diff --git a/flink-contrib/docker-flink/docker-compose-bluemix.yml b/flink-contrib/docker-flink/docker-compose-bluemix.yml
index 0155f01..b667a0d 100644
--- a/flink-contrib/docker-flink/docker-compose-bluemix.yml
+++ b/flink-contrib/docker-flink/docker-compose-bluemix.yml
@@ -40,3 +40,5 @@ services:
     command: taskmanager
     links:
       - "jobmanager:jobmanager"
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS="jobmanager"

http://git-wip-us.apache.org/repos/asf/flink/blob/6e1e1395/flink-contrib/docker-flink/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-contrib/docker-flink/docker-compose.yml b/flink-contrib/docker-flink/docker-compose.yml
index eaa30a6..6a13353 100644
--- a/flink-contrib/docker-flink/docker-compose.yml
+++ b/flink-contrib/docker-flink/docker-compose.yml
@@ -26,6 +26,8 @@ services:
     ports:
       - "48081:8081"
     command: jobmanager
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS="jobmanager"
 
   taskmanager:
     image: flink
@@ -37,3 +39,5 @@ services:
     command: taskmanager
     links:
       - "jobmanager:jobmanager"
+    environment:
+      - JOB_MANAGER_RPC_ADDRESS="jobmanager"

http://git-wip-us.apache.org/repos/asf/flink/blob/6e1e1395/flink-contrib/docker-flink/docker-entrypoint.sh
----------------------------------------------------------------------
diff --git a/flink-contrib/docker-flink/docker-entrypoint.sh b/flink-contrib/docker-flink/docker-entrypoint.sh
index a2ca0e9..780ce38 100755
--- a/flink-contrib/docker-flink/docker-entrypoint.sh
+++ b/flink-contrib/docker-flink/docker-entrypoint.sh
@@ -18,20 +18,19 @@
 # limitations under the License.
 ################################################################################
 
-if [ "$1" = "jobmanager" ]; then
-    echo "Starting Job Manager"
-    #sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: `hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml
+### If unspecified, the hostname of the container is taken as the JobManager address
+JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-`hostname -f`}
+###
 
-    # make use of container linking and exploit the jobmanager entry in /etc/hosts
-    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml
+if [ "$1" == "jobmanager" ]; then
+    echo "Starting Job Manager"
+    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" $FLINK_HOME/conf/flink-conf.yaml
 
     echo "config file: " && grep '^[^\n#]' $FLINK_HOME/conf/flink-conf.yaml
     $FLINK_HOME/bin/jobmanager.sh start cluster
-elif [ "$1" = "taskmanager" ]; then
-
-    # make use of container linking and exploit the jobmanager entry in /etc/hosts
-    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml
+elif [ "$1" == "taskmanager" ]; then
 
+    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" $FLINK_HOME/conf/flink-conf.yaml
     sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" $FLINK_HOME/conf/flink-conf.yaml
 
     echo "Starting Task Manager"
@@ -40,3 +39,6 @@ elif [ "$1" = "taskmanager" ]; then
 else
     $@
 fi
+
+# prevent script to exit
+tail -f /dev/null


[2/7] flink git commit: [maven] properly attach the CEP Scala source code

Posted by mx...@apache.org.
[maven] properly attach the CEP Scala source code

Two options, either change the default Maven source directory from
'src/main/java' to 'src/main/scala' or use the build-helper-maven-plugin
to attach the Scala sources. Opting for both here to be in lines with
Maven standards and support Eclipse.

This closes #2908.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9e6688e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9e6688e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9e6688e

Branch: refs/heads/master
Commit: e9e6688e0e8c1febb6ee006f710d5eeab14f2a91
Parents: 0506a63
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Nov 30 12:20:40 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 flink-libraries/flink-cep-scala/pom.xml | 37 ++++++++++++++++++++++++++++
 1 file changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9e6688e/flink-libraries/flink-cep-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/pom.xml b/flink-libraries/flink-cep-scala/pom.xml
index 2aef4ec..a613d44 100644
--- a/flink-libraries/flink-cep-scala/pom.xml
+++ b/flink-libraries/flink-cep-scala/pom.xml
@@ -93,6 +93,8 @@ under the License.
     </dependencies>
 
     <build>
+        <!-- Not strictly necessary because of the build-helper-maven-plugin below. -->
+        <sourceDirectory>src/main/scala</sourceDirectory>
         <plugins>
             <!-- Scala Compiler -->
             <plugin>
@@ -116,6 +118,41 @@ under the License.
 					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
+
+			<!-- Adding scala source directories to build path for Eclipse and generating the sources jar -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
         </plugins>
     </build>
     


[7/7] flink git commit: [FLINK-5350] don't overwrite an existing JAAS config

Posted by mx...@apache.org.
[FLINK-5350] don't overwrite an existing JAAS config

Users may want to use SASL/PLAIN https://tools.ietf.org/html/rfc4616
without Kerberos enabled.

Skip security configuration if no Kerberos credentials are available.

This closes #3017.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0506a63c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0506a63c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0506a63c

Branch: refs/heads/master
Commit: 0506a63c8a7e50a0eaf66cd0bbec42e2fac5017c
Parents: becd270
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Dec 15 15:29:21 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 .../runtime/security/HadoopSecurityContext.java |  5 ++--
 .../flink/runtime/security/SecurityUtils.java   | 30 ++++++++++++++------
 .../runtime/security/SecurityUtilsTest.java     | 23 +++++++++++++++
 3 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
index ea6e5e3..c70f00b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
@@ -25,9 +25,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
 
 /*
- * Process-wide security context object which initializes UGI with appropriate security credentials and also it
- * creates in-memory JAAS configuration object which will serve appropriate ApplicationConfigurationEntry for the
- * connector login module implementation that authenticates Kerberos identity using SASL/JAAS based mechanism.
+ * Hadoop security context which runs a Callable with the previously
+ * initialized UGI and appropriate security credentials.
  */
 class HadoopSecurityContext implements SecurityContext {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 7416cc6..d7fc6ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -64,12 +64,16 @@ public class SecurityUtils {
 	public static SecurityContext getInstalledContext() { return installedContext; }
 
 	/**
-	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism
+	 * Performs a static initialization of the JAAS and Hadoop UGI security mechanism.
+	 * It creates the in-memory JAAS configuration object which will serve appropriate
+	 * ApplicationConfigurationEntry for the connector login module implementation that
+	 * authenticates Kerberos identity using SASL/JAAS based mechanism.
 	 */
 	public static void install(SecurityConfiguration config) throws Exception {
 
-		if (!(installedContext instanceof NoOpSecurityContext)) {
-			LOG.warn("overriding previous security context");
+		if (!config.securityIsEnabled()) {
+			// do not perform any initialization if no Kerberos crendetails are provided
+			return;
 		}
 
 		// establish the JAAS config
@@ -151,10 +155,18 @@ public class SecurityUtils {
 				}
 			}
 
+			if (!(installedContext instanceof NoOpSecurityContext)) {
+				LOG.warn("overriding previous security context");
+			}
+
 			installedContext = new HadoopSecurityContext(loginUser);
 		}
 	}
 
+	static void clearContext() {
+		installedContext = new NoOpSecurityContext();
+	}
+
 	/*
 	 * This method configures some of the system properties that are require for ZK and Kafka SASL authentication
 	 * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
@@ -163,11 +175,7 @@ public class SecurityUtils {
 	 * Kafka current code behavior.
 	 */
 	private static void populateSystemSecurityProperties(Configuration configuration) {
-		Preconditions.checkNotNull(configuration, "The supplied configuation was null");
-
-		//required to be empty for Kafka but we will override the property
-		//with pseudo JAAS configuration file if SASL auth is enabled for ZK
-		System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+		Preconditions.checkNotNull(configuration, "The supplied configuration was null");
 
 		boolean disableSaslClient = configuration.getBoolean(HighAvailabilityOptions.ZOOKEEPER_SASL_DISABLE);
 
@@ -203,7 +211,7 @@ public class SecurityUtils {
 		String zkSaslServiceName = configuration.getValue(HighAvailabilityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
 		if (!StringUtils.isBlank(zkSaslServiceName)) {
 			LOG.info("ZK SASL service name: {} is provided in the configuration", zkSaslServiceName);
-			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME,zkSaslServiceName);
+			System.setProperty(ZOOKEEPER_SASL_CLIENT_USERNAME, zkSaslServiceName);
 		}
 
 	}
@@ -268,6 +276,10 @@ public class SecurityUtils {
 			}
 
 		}
+
+		public boolean securityIsEnabled() {
+			return keytab != null && principal != null;
+		}
 	}
 
 	// Just a util, shouldn't be instantiated.

http://git-wip-us.apache.org/repos/asf/flink/blob/0506a63c/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index ecb89e0..1d38899 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.runtime.security;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -31,6 +33,12 @@ import static org.junit.Assert.fail;
  */
 public class SecurityUtilsTest {
 
+	@AfterClass
+	public static void afterClass() {
+		SecurityUtils.clearContext();
+		System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, "");
+	}
+
 	@Test
 	public void testCreateInsecureHadoopCtx() {
 		SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(new Configuration());
@@ -51,6 +59,21 @@ public class SecurityUtilsTest {
 		}
 	}
 
+	@Test
+	/**
+	 * The Jaas configuration file provided should not be overridden.
+	 */
+	public void testJaasPropertyOverride() throws Exception {
+		String confFile = "jaas.conf";
+		System.setProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG, confFile);
+
+		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(new Configuration()));
+
+		Assert.assertEquals(
+			confFile,
+			System.getProperty(SecurityUtils.JAVA_SECURITY_AUTH_LOGIN_CONFIG));
+	}
+
 
 	private String getOSUserName() throws Exception {
 		String userName = "";


[3/7] flink git commit: [FLINK-4922][docs] document how to use Flink on Mesos

Posted by mx...@apache.org.
[FLINK-4922][docs] document how to use Flink on Mesos

This closes #3007.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da0b0e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da0b0e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da0b0e4

Branch: refs/heads/master
Commit: 9da0b0e49e10bd16b8644f61fc863627135b51f6
Parents: 6e1e139
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Dec 14 15:14:50 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 docs/setup/mesos.md | 244 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 244 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9da0b0e4/docs/setup/mesos.md
----------------------------------------------------------------------
diff --git a/docs/setup/mesos.md b/docs/setup/mesos.md
new file mode 100644
index 0000000..0c7e804
--- /dev/null
+++ b/docs/setup/mesos.md
@@ -0,0 +1,244 @@
+---
+title:  "Mesos Setup"
+nav-title: Mesos
+nav-parent_id: deployment
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Background
+
+The Mesos implementation consists of two components: The Application Master and
+the Worker. The workers are simple TaskManagers which are parameterized by the environment
+set up by the application master. The most sophisticated component of the Mesos
+implementation is the application master. The application master currently hosts
+the following components:
+
+### Mesos Scheduler 
+
+The scheduler is responsible for registering the framework with Mesos,
+requesting resources, and launching worker nodes. The scheduler continuously
+needs to report back to Mesos to ensure the framework is in a healthy state. To
+verify the health of the cluster, the scheduler monitors the spawned workers and
+marks them as failed and restarts them if necessary.
+
+Flink's Mesos scheduler itself is currently not highly available. However, it
+persists all necessary information about its state (e.g. configuration, list of
+workers) in Zookeeper. In the presence of a failure, it relies on an external
+system to bring up a new scheduler. The scheduler will then register with Mesos
+again and go through the reconciliation phase. In the reconciliation phase, the
+scheduler receives a list of running workers nodes. It matches these against the
+recovered information from Zookeeper and makes sure to bring back the cluster in
+the state before the failure.
+
+### Artifact Server
+
+The artifact server is responsible for providing resources to the worker
+nodes. The resources can be anything from the Flink binaries to shared secrets
+or configuration files. For instance, in non-containered environments, the
+artifact server will provide the Flink binaries. What files will be served
+depends on the configuration overlay used. 
+
+### Flink's JobManager and Web Interface
+
+The Mesos scheduler currently resides with the JobManager but will be started
+independently of the JobManager in future versions (see
+[FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)). The
+proposed changes will also add a Dipsatcher component which will be the central
+point for job submission and monitoring.
+
+### Startup script and configuration overlays
+
+The startup script provide a way to configure and start the application
+master. All further configuration is then inherited by the workers nodes. This
+is achieved using configuration overlays. Configuration overlays provide a way
+to infer configuration from environment variables and config files which are
+shipped to the worker nodes.
+
+
+## DC/OS
+
+This section refers to [DC/OS](https://dcos.io) which is a Mesos distribution
+with a sophisticated application management layer. It comes pre-installed with
+Marathon, a service to supervise applications and maintain their state in case
+of failures.
+
+If you don't have a running DC/OS cluster, please follow the
+[instructions on how to install DC/OS on the official website](https://dcos.io/install/).
+
+Once you have a DC/OS cluster, you may install Flink through the DC/OS
+Universe. In the search prompt, just search for Flink. 
+
+**Note**: At the time of this writing, Flink was not yet available in the
+Unvierse. Please use the following workaround in the meantime:
+
+1. Add the Development Universe
+
+    `dcos marathon app add https://raw.githubusercontent.com/mesosphere/dcos-flink-service/Makman2/quickstart/universe-server.json`
+    
+2. Add the local Universe repository:
+
+   `dcos package repo add --index=0 dev-universe http://universe.marathon.mesos:8085/repo`
+
+3. Install Flink through the Universe page or using the `dcos` command:
+   
+   `dcos package install flink`
+
+
+## Mesos without DC/OS
+
+Let's take a look at how to setup Flink on Mesos without DC/OS. 
+
+### Prerequisites
+
+Please follow the
+[instructions on how to setup Mesos on the official website](http://mesos.apache.org/documentation/latest/). 
+
+### Optional dependencies
+
+Optionally,
+you may also install [Marathon](https://mesosphere.github.io/marathon/) which
+will be necessary if you want your Flink cluster to be highly available in the
+presence of master node failures. Additionally, you probably want to install a
+distributed file system to share data across nodes and make use of Flink's
+checkpointing mechanism.
+
+### Pre-installing Flink vs Docker/Mesos containers
+
+You may install Flink on all of your Mesos Master and Agent nodes. You can also
+pull the binaries from the Flink web site during deployment and apply your
+custom configuration before launching the application master. A more
+convenient and easier to maintain approach is to use Docker containers to manage
+the Flink binaries and configuration.
+
+This is controlled via the following configuration entries:
+
+    mesos.resourcemanager.tasks.container.type: mesos _or_ docker
+    
+If set to 'docker', specify the image name:
+
+    mesos.resourcemanager.tasks.container.image.name: image_name
+
+
+### Standalone
+
+In the `/bin` directory of the Flink distribution, you find two startup scripts
+which manage the Flink processes in a Mesos cluster:
+
+1. mesos-appmaster.sh
+   This starts the Mesos application master which will register the Mesos
+   scheduler. It is also responsible for starting up the worker nodes.
+   
+2. mesos-taskmanager.sh
+   The entry point for the Mesos worker processes. You don't need to explicitly
+   execute this script. It is automatically launched by the Mesos worker node to
+   bring up a new TaskManager.
+
+
+### High Availability
+
+You will need to run a service like Marathon or Apache Aurora which takes care
+of restarting the Flink master process in case of node or process failures. In
+addition, Zookeeper needs to be configured like described in the
+[High Availability section of the Flink docs]({{ site.baseurl }}/setup/jobmanager_high_availability.html)
+
+For the reconciliation of tasks to work correctly, please also set
+`recovery.zookeeper.path.mesos-workers` to a valid Zookeeper path.
+
+#### Marathon
+
+Marathon needs to be set up to launch the `bin/mesos-appmaster.sh` script. In
+particular, it should also adjust any configuration parameters for the Flink
+cluster. 
+
+Here is an example configuration for Marathon:
+
+    {
+    "id": "basic-0", 
+    "cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -DconfigEntry=configValue -DanotherEntry=anotherValue ...",
+    "cpus": 1.0,
+    "mem": 2048,
+    }
+
+### Configuration parameters
+
+#### Mesos configuration entries
+
+
+`mesos.initial-tasks`: The initial workers to bring up when the master
+    starts (**DEFAULT**: The number of workers specified at cluster startup).
+
+`mesos.maximum-failed-tasks`: The maximum number of failed workers before
+    the cluster fails (**DEFAULT**: Number of initial workers) May be set to -1
+    to disable this feature.
+    
+`mesos.master`: The Mesos master URL. The value should be in one of the
+	 following forms: host:port, zk://host1:port1,host2:port2,.../path,
+	 zk://username:password@host1:port1,host2:port2,.../path,
+	 file:///path/to/file (where file contains one of the above)
+     
+`mesos.failover-timeout`: The failover timeout in seconds for the Mesos scheduler, after
+	 which running tasks are automatically shut down (**DEFAULT:** 600).
+
+`mesos.resourcemanager.artifactserver.port`:The config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port.
+
+`mesos.resourcemanager.framework.name`: Mesos framework name (**DEFAULT:** Flink)
+
+`mesos.resourcemanager.framework.role`: Mesos framework role definition (**DEFAULT:** *)
+
+`recovery.zookeeper.path.mesos-workers`: The ZooKeeper root path for persisting the Mesos worker information.
+
+`mesos.resourcemanager.framework.principal`: Mesos framework principal (**NO DEFAULT**)
+
+`mesos.resourcemanager.framework.secret`: Mesos framework secret (**NO DEFAULT**)
+
+`mesos.resourcemanager.framework.user`: Mesos framework user (**DEFAULT:**"")
+
+`mesos.resourcemanager.artifactserver.ssl.enabled`: Enables SSL for the Flink
+artifact server (**DEFAULT**: true). Note that `security.ssl.enabled` also needs
+to be set to `true` encryption to enable encryption.
+
+`mesos.resourcemanager.tasks.mem`: Memory to assign to the Mesos workers in MB (**DEFAULT**: 1024)
+
+`mesos.resourcemanager.tasks.cpus`: CPUs to assign to the Mesos workers (**DEFAULT**: 0.0)
+
+`mesos.resourcemanager.tasks.container.type`: Type of the containerization used: "mesos" or "docker" (DEFAULT: mesos);
+
+`mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
+
+
+#### General configuration
+
+It is possible to completely parameterize a Mesos application through Java
+properties passed to the Mesos application master. This also allows to specify
+general Flink configuration parameters. For example:
+
+    bin/mesos-appmaster.sh \
+        -Djobmanager.heap.mb=1024 \
+        -Djobmanager.rpc.port=6123 \
+        -Djobmanager.web.port=8081 \
+        -Dmesos.initial-tasks=10 \
+        -Dmesos.resourcemanager.tasks.mem=4096 \
+        -Dtaskmanager.heap.mb=3500 \
+        -Dtaskmanager.numberOfTaskSlots=2 \
+        -Dparallelism.default=10


[5/7] flink git commit: [FLINK-5091] cleanup unused Mesos configuration entries

Posted by mx...@apache.org.
[FLINK-5091] cleanup unused Mesos configuration entries


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/becd2702
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/becd2702
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/becd2702

Branch: refs/heads/master
Commit: becd27029af97ccb6246df5e6f31342dac390b22
Parents: 9da0b0e
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Dec 14 15:35:30 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 33 +-------------------
 .../clusterframework/LaunchableMesosWorker.java |  1 +
 .../MesosTaskManagerParameters.java             | 13 ++++++--
 3 files changed, 13 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/becd2702/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a515c33..8a9d594 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -492,40 +492,10 @@ public final class ConfigConstants {
 	public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
 
 	/**
-	 * The cpus to acquire from Mesos.
-	 *
-	 * By default, we use the number of requested task slots.
-	 */
-	public static final String MESOS_RESOURCEMANAGER_TASKS_CPUS = "mesos.resourcemanager.tasks.cpus";
-
-	/**
-	 * The container image to use for task managers.
-	 */
-	public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME =
-		"mesos.resourcemanager.tasks.container.image.name";
-
-	/**
 	 * Config parameter to override SSL support for the Artifact Server
 	 */
 	public static final String MESOS_ARTIFACT_SERVER_SSL_ENABLED = "mesos.resourcemanager.artifactserver.ssl.enabled";
 
-	/**
-	 * The type of container to use for task managers. Valid values are
-	 * {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS} or
-	 * {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER}.
-	 */
-	public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE =
-		"mesos.resourcemanager.tasks.container.type";
-
-	/**
-	 * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer.
-	 */
-	public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS = "mesos";
-	/**
-	 * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Docker containerizer.
-	 */
-	public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER = "docker";
-
 	// ------------------------ Hadoop Configuration ------------------------
 
 	/**
@@ -1177,6 +1147,7 @@ public final class ConfigConstants {
 	public static final String DEFAULT_YARN_JOB_MANAGER_PORT = "0";
 
 	// ------ Mesos-Specific Configuration ------
+	// For more configuration entries please see {@code MesosTaskManagerParameters}.
 
 	/** The default failover timeout provided to Mesos (10 mins) */
 	public static final int DEFAULT_MESOS_FAILOVER_TIMEOUT_SECS = 10 * 60;
@@ -1198,8 +1169,6 @@ public final class ConfigConstants {
 	/** Default value to override SSL support for the Artifact Server */
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 
-	public static final String DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE = "mesos";
-
 	// ------------------------ File System Behavior ------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/becd2702/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index c6e51f1..bfe9be8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -63,6 +63,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 	/**
 	 * Construct a launchable Mesos worker.
+	 * @param resolver The resolver for retrieving artifacts (e.g. jars, configuration)
 	 * @param params the TM parameters such as memory, cpu to acquire.
 	 * @param containerSpec an abstract container specification for launch time.
 	 * @param taskID the taskID for this worker.

http://git-wip-us.apache.org/repos/asf/flink/blob/becd2702/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 7fae58c..044bffe 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -56,6 +56,15 @@ public class MesosTaskManagerParameters {
 		key("mesos.resourcemanager.tasks.container.image.name")
 			.noDefaultValue();
 
+	/**
+	 * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer.
+	 */
+	public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS = "mesos";
+	/**
+	 * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Docker containerizer.
+	 */
+	public static final String MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER = "docker";
+
 	private final double cpus;
 
 	private final ContainerType containerType;
@@ -139,10 +148,10 @@ public class MesosTaskManagerParameters {
 		ContainerType containerType;
 		String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
 		switch(containerTypeString) {
-			case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
+			case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
 				containerType = ContainerType.MESOS;
 				break;
-			case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
+			case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
 				containerType = ContainerType.DOCKER;
 				if(imageName == null || imageName.length() == 0) {
 					throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +


[6/7] flink git commit: [FLINK-2821] use custom Akka build to listen on all interfaces

Posted by mx...@apache.org.
[FLINK-2821] use custom Akka build to listen on all interfaces

This uses Flakka (a custom Akka 2.3 build) to resolve the issue that
the bind address needs to be matching the external address of the
JobManager. With the changes applied, we can now bind to all
interfaces, e.g. via 0.0.0.0 (IPv4) or :: (IPv6).

For this to work properly, the configuration entry
JOB_MANAGER_IPC_ADDRESS now represents the external address of the
JobManager. Consequently, it should not be resolved to an IP address
anymore because it may not be resolvable from within containered
environments. Akka treats this address as the logical address. Any
messages which are not tagged with this address will be received by
the Actor System (because we listen on all interfaces) but will be
dropped subsequently. In addition, we need the external address for
the JobManager to be able to publish it to Zookeeper for HA setups.

Flakka: https://github.com/mxm/flakka
Patch applied: https://github.com/akka/akka/pull/15610

- convert host to lower case
- use consistent format for IPv6 address
- adapt config and test cases
- adapt documentation to clarify the address config entry
- TaskManager: resolve the initial hostname of the StandaloneLeaderRetrievalService

This closes #2917.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27ebdf7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27ebdf7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27ebdf7a

Branch: refs/heads/master
Commit: 27ebdf7acde0e2c3ad183503d0588ca91e63d729
Parents: e9e6688
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Nov 16 15:50:01 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Dec 16 17:51:59 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  4 +-
 flink-clients/pom.xml                           |  4 +-
 .../flink/client/program/ClusterClient.java     |  2 +-
 .../RemoteExecutorHostnameResolutionTest.java   | 23 ++---
 ...rRetrievalServiceHostnameResolutionTest.java | 37 ++++----
 .../org/apache/flink/storm/api/FlinkClient.java |  4 +-
 .../java/org/apache/flink/util/NetUtils.java    | 79 +++++++++++++++-
 .../org/apache/flink/util/NetUtilsTest.java     | 59 ++++++++++++
 flink-dist/src/main/resources/flink-conf.yaml   | 14 ++-
 flink-mesos/pom.xml                             | 16 ++--
 .../main/resources/archetype-resources/pom.xml  |  6 +-
 .../main/resources/archetype-resources/pom.xml  |  6 +-
 flink-runtime-web/pom.xml                       |  4 +-
 flink-runtime/pom.xml                           | 16 ++--
 .../jobmanager/JobManagerCliOptions.java        |  2 +-
 .../runtime/util/LeaderRetrievalUtils.java      | 16 +++-
 .../flink/runtime/util/StandaloneUtils.java     | 42 ++++++---
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 72 ++++++++------
 .../flink/runtime/jobmanager/JobManager.scala   | 98 ++++++++------------
 .../runtime/minicluster/FlinkMiniCluster.scala  |  2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  5 +-
 .../JobManagerProcessReapingTest.java           |  4 +-
 .../jobmanager/JobManagerStartupTest.java       |  4 +-
 .../ZooKeeperLeaderRetrievalTest.java           |  8 +-
 .../TaskManagerProcessReapingTestBase.java      |  6 +-
 .../taskmanager/TaskManagerStartupTest.java     |  7 +-
 .../runtime/testutils/JobManagerProcess.java    |  4 +-
 .../flink/runtime/akka/AkkaUtilsTest.scala      |  5 +-
 .../jobmanager/JobManagerConnectionTest.scala   |  4 +-
 flink-tests/pom.xml                             |  4 +-
 flink-yarn-tests/pom.xml                        |  4 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  2 +-
 flink-yarn/pom.xml                              | 16 ++--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  2 +-
 pom.xml                                         | 30 +++---
 35 files changed, 390 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 89e8207..680f4f7 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -48,7 +48,7 @@ The configuration files for the TaskManagers can be different, Flink does not as
 
 - `env.java.opts.taskmanager`: TaskManager-specific JVM options. These are used in addition to the regular `env.java.opts`. This configuration option is ignored by the YARN client.
 
-- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost).
+- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). **Note:** The address (host name or IP) should be accessible by all nodes including the client.
 
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
 
@@ -206,7 +206,7 @@ These parameters configure the default HDFS used by Flink. Setups that do not sp
 
 The following parameters configure Flink's JobManager and TaskManagers.
 
-- `jobmanager.rpc.address`: The IP address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**).
+- `jobmanager.rpc.address`: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: **localhost**). **Note:** The address (host name or IP) should be accessible by all nodes including the client.
 
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: **6123**).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 5639ed6..14b5461 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -78,8 +78,8 @@ under the License.
 		</dependency>
 		
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 34a9197..8d0e841 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -413,7 +413,7 @@ public abstract class ClusterClient {
 
 		final LeaderRetrievalService leaderRetrievalService;
 		try {
-			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig, true);
 		} catch (Exception e) {
 			throw new ProgramInvocationException("Could not create the leader retrieval service", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index fb5200b..07edb3a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
@@ -39,15 +40,17 @@ public class RemoteExecutorHostnameResolutionTest {
 
 	private static final String nonExistingHostname = "foo.bar.com.invalid";
 	private static final int port = 14451;
-	
-	
+
+	@BeforeClass
+	public static void check() {
+		checkPreconditions();
+	}
+
 	@Test
 	public void testUnresolvableHostname1() {
-		
-		checkPreconditions();
-		
+
+		RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 		try {
-			RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port);
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}
@@ -65,12 +68,10 @@ public class RemoteExecutorHostnameResolutionTest {
 	@Test
 	public void testUnresolvableHostname2() {
 
-		checkPreconditions();
-		
-		try {
-			InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
-			RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
+		InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port);
+		RemoteExecutor exec = new RemoteExecutor(add, new Configuration(),
 				Collections.<URL>emptyList(), Collections.<URL>emptyList());
+		try {
 			exec.executePlan(getProgram());
 			fail("This should fail with an ProgramInvocationException");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index ee26145..dd7d8bc 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -22,27 +22,34 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.TestLogger;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import static org.junit.Assert.*;
-import static org.junit.Assume.*;
+import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify that the LeaderRetrievalSevice correctly handles non-resolvable host names
  * and does not fail with another exception
  */
 public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
-	
+
 	private static final String nonExistingHostname = "foo.bar.com.invalid";
-	
+
+	@BeforeClass
+	public static void check() {
+		checkPreconditions();
+	}
+
+	/*
+	 * Tests that the StandaloneLeaderRetrievalService resolves host names if specified.
+	 */
 	@Test
 	public void testUnresolvableHostname1() {
-		
-		checkPreconditions();
-		
+
 		try {
 			Configuration config = new Configuration();
 
@@ -50,30 +57,28 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
 			LeaderRetrievalUtils.createLeaderRetrievalService(config);
-			fail("This should fail with an UnknownHostException");
-		}
-		catch (UnknownHostException e) {
-			// that is what we want!
 		}
 		catch (Exception e) {
-			System.err.println("Wrong exception!");
+			System.err.println("Shouldn't throw an exception!");
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
 
+	/*
+	 * Tests that the StandaloneLeaderRetrievalService does not resolve host names by default.
+	 */
 	@Test
 	public void testUnresolvableHostname2() {
 
-		checkPreconditions();
-		
 		try {
 			Configuration config = new Configuration();
+
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, nonExistingHostname);
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 17234);
 
-			LeaderRetrievalUtils.createLeaderRetrievalService(config);
-			fail("This should fail with an UnknownHostException");
+			LeaderRetrievalUtils.createLeaderRetrievalService(config, true);
+			fail("This should fail with an IllegalConfigurationException");
 		}
 		catch (UnknownHostException e) {
 			// that is what we want!
@@ -84,7 +89,7 @@ public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private static void checkPreconditions() {
 		// the test can only work if the invalid URL cannot be resolves
 		// some internet providers resolve unresolvable URLs to navigational aid servers,

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 9f47d60..6019aa3 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +64,6 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -333,7 +333,7 @@ public class FlinkClient {
 		}
 
 		return JobManager.getJobManagerActorRef(AkkaUtils.getAkkaProtocol(configuration),
-				new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+				NetUtils.unresolvedHostAndPortToNormalizedString(this.jobManagerHost, this.jobManagerPort),
 				actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 6f63eb4..d4437e4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -20,8 +20,10 @@ package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
 
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.net.util.IPAddressUtil;
 
 import java.io.IOException;
 import java.net.Inet4Address;
@@ -40,6 +42,9 @@ import java.util.Iterator;
 public class NetUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
+
+	/** The wildcard address to listen on all interfaces (either 0.0.0.0 or ::) */
+	private static final String WILDCARD_ADDRESS = new InetSocketAddress(0).getAddress().getHostAddress();
 	
 	/**
 	 * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts
@@ -111,7 +116,55 @@ public class NetUtils {
 	// ------------------------------------------------------------------------
 	//  Encoding of IP addresses for URLs
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * Returns an address in a normalized format for Akka.
+	 * When an IPv6 address is specified, it normalizes the IPv6 address to avoid
+	 * complications with the exact URL match policy of Akka.
+	 * @param host The hostname, IPv4 or IPv6 address
+	 * @return host which will be normalized if it is an IPv6 address
+	 */
+	public static String unresolvedHostToNormalizedString(String host) {
+		// Return loopback interface address if host is null
+		// This represents the behavior of {@code InetAddress.getByName } and RFC 3330
+		if (host == null) {
+			host = InetAddress.getLoopbackAddress().getHostAddress();
+		} else {
+			host = host.trim().toLowerCase();
+		}
+
+		// normalize and valid address
+		if (IPAddressUtil.isIPv6LiteralAddress(host)) {
+			byte[] ipV6Address = IPAddressUtil.textToNumericFormatV6(host);
+			host = getIPv6UrlRepresentation(ipV6Address);
+		} else if (!IPAddressUtil.isIPv4LiteralAddress(host)) {
+			try {
+				// We don't allow these in hostnames
+				Preconditions.checkArgument(!host.startsWith("."));
+				Preconditions.checkArgument(!host.endsWith("."));
+				Preconditions.checkArgument(!host.contains(":"));
+			} catch (Exception e) {
+				throw new IllegalConfigurationException("The configured hostname is not valid", e);
+			}
+		}
+
+		return host;
+	}
+
+	/**
+	 * Returns a valid address for Akka. It returns a String of format 'host:port'.
+	 * When an IPv6 address is specified, it normalizes the IPv6 address to avoid
+	 * complications with the exact URL match policy of Akka.
+	 * @param host The hostname, IPv4 or IPv6 address
+	 * @param port The port
+	 * @return host:port where host will be normalized if it is an IPv6 address
+	 */
+	public static String unresolvedHostAndPortToNormalizedString(String host, int port) {
+		Preconditions.checkArgument(port >= 0 && port < 65536,
+			"Port is not within the valid range,");
+		return unresolvedHostToNormalizedString(host) + ":" + port;
+	}
+
 	/**
 	 * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
 	 * have the proper formatting to be included in URLs.
@@ -137,7 +190,7 @@ public class NetUtils {
 	/**
 	 * Encodes an IP address and port to be included in URL. in particular, this method makes
 	 * sure that IPv6 addresses have the proper formatting to be included in URLs.
-	 * 
+	 *
 	 * @param address The address to be included in the URL.
 	 * @param port The port for the URL address.
 	 * @return The proper URL string encoded IP address and port.
@@ -176,14 +229,24 @@ public class NetUtils {
 
 	/**
 	 * Creates a compressed URL style representation of an Inet6Address.
-	 * 
+	 *
 	 * <p>This method copies and adopts code from Google's Guava library.
 	 * We re-implement this here in order to reduce dependency on Guava.
 	 * The Guava library has frequently caused dependency conflicts in the past.
 	 */
 	private static String getIPv6UrlRepresentation(Inet6Address address) {
+		return getIPv6UrlRepresentation(address.getAddress());
+	}
+
+	/**
+	 * Creates a compressed URL style representation of an Inet6Address.
+	 *
+	 * <p>This method copies and adopts code from Google's Guava library.
+	 * We re-implement this here in order to reduce dependency on Guava.
+	 * The Guava library has frequently caused dependency conflicts in the past.
+	 */
+	private static String getIPv6UrlRepresentation(byte[] addressBytes) {
 		// first, convert bytes to 16 bit chunks
-		byte[] addressBytes = address.getAddress();
 		int[] hextets = new int[8];
 		for (int i = 0; i < hextets.length; i++) {
 			hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);
@@ -309,6 +372,14 @@ public class NetUtils {
 		return null;
 	}
 
+	/**
+	 * Returns the wildcard address to listen on all interfaces.
+	 * @return Either 0.0.0.0 or :: depending on the IP setup.
+	 */
+	public static String getWildcardIPAddress() {
+		return WILDCARD_ADDRESS;
+	}
+
 	public interface SocketFactory {
 		ServerSocket createSocket(int port) throws IOException;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index 72cc89e..03b21dd 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -162,4 +163,62 @@ public class NetUtilsTest {
 		error = null;
 
 	}
+
+	@Test
+	public void testFormatAddress() throws UnknownHostException {
+		{
+			// IPv4
+			String host = "1.2.3.4";
+			int port = 42;
+			Assert.assertEquals(host + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// IPv6
+			String host = "2001:0db8:85a3:0000:0000:8a2e:0370:7334";
+			int port = 42;
+			Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// Hostnames
+			String host = "somerandomhostname";
+			int port = 99;
+			Assert.assertEquals(host + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// Whitespace
+			String host = "  somerandomhostname  ";
+			int port = 99;
+			Assert.assertEquals(host.trim() + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+		{
+			// Illegal hostnames
+			String host = "illegalhost.";
+			int port = 42;
+			try {
+				NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
+				fail();
+			} catch (Exception ignored) {}
+			// Illegal hostnames
+			host = "illegalhost:fasf";
+			try {
+				NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
+				fail();
+			} catch (Exception ignored) {}
+		}
+		{
+			// Illegal port ranges
+			String host = "1.2.3.4";
+			int port = -1;
+			try {
+				NetUtils.unresolvedHostAndPortToNormalizedString(host, port);
+				fail();
+			} catch (Exception ignored) {}
+		}
+		{
+			// lower case conversion of hostnames
+			String host = "CamelCaseHostName";
+			int port = 99;
+			Assert.assertEquals(host.toLowerCase() + ":" + port, NetUtils.unresolvedHostAndPortToNormalizedString(host, port));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 751acda..c650cfe 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -21,14 +21,18 @@
 # Common
 #==============================================================================
 
-# The host on which the JobManager runs. Only used in non-high-availability mode.
-# The JobManager process will use this hostname to bind the listening servers to.
-# The TaskManagers will try to connect to the JobManager on that host.
+# The external address of the host on which the JobManager runs and can be
+# reached by the TaskManagers and any clients which want to connect. This setting
+# is only used in Standalone mode and may be overwritten on the JobManager side
+# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
+# In high availability mode, if you use the bin/start-cluster.sh script and setup
+# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
+# automatically configure the host name based on the hostname of the node where the
+# JobManager runs.
 
 jobmanager.rpc.address: localhost
 
-
-# The port where the JobManager's main actor system listens for messages.
+# The RPC port where the JobManager is reachable.
 
 jobmanager.rpc.port: 6123
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 55e0472..69e0c84 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -61,13 +61,13 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 			<exclusions>
 				<!-- exclude protobuf here to allow the mesos library to provide it -->
 				<exclusion>
@@ -78,8 +78,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-slf4j_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
@@ -123,8 +123,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index cf66bcf..57f3e25 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -194,9 +194,9 @@ under the License.
 									<exclude>org.scala-lang:scala-library</exclude>
 									<exclude>org.scala-lang:scala-compiler</exclude>
 									<exclude>org.scala-lang:scala-reflect</exclude>
-									<exclude>com.typesafe.akka:akka-actor_*</exclude>
-									<exclude>com.typesafe.akka:akka-remote_*</exclude>
-									<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
+									<exclude>com.data-artisans:flakka-actor_*</exclude>
+									<exclude>com.data-artisans:flakka-remote_*</exclude>
+									<exclude>com.data-artisans:flakka-slf4j_*</exclude>
 									<exclude>io.netty:netty-all</exclude>
 									<exclude>io.netty:netty</exclude>
 									<exclude>commons-fileupload:commons-fileupload</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index afb9b50..24225f6 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -199,9 +199,9 @@ under the License.
 									<exclude>org.scala-lang:scala-library</exclude>
 									<exclude>org.scala-lang:scala-compiler</exclude>
 									<exclude>org.scala-lang:scala-reflect</exclude>
-									<exclude>com.typesafe.akka:akka-actor_*</exclude>
-									<exclude>com.typesafe.akka:akka-remote_*</exclude>
-									<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
+									<exclude>com.data-artisans:flakka-actor_*</exclude>
+									<exclude>com.data-artisans:flakka-remote_*</exclude>
+									<exclude>com.data-artisans:flakka-slf4j_*</exclude>
 									<exclude>io.netty:netty-all</exclude>
 									<exclude>io.netty:netty</exclude>
 									<exclude>commons-fileupload:commons-fileupload</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index badb9ba..a6397a4 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -92,8 +92,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index eec75c9..e522d77 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -92,18 +92,18 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-slf4j_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
@@ -193,8 +193,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 		</dependency>
 
 	</dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index 3598a29..c460345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmanager;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The command line parameters passed to the TaskManager.
+ * The command line parameters passed to the JobManager.
  */
 public class JobManagerCliOptions {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index b6d9306..b18cdd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -60,12 +60,26 @@ public class LeaderRetrievalUtils {
 	 */
 	public static LeaderRetrievalService createLeaderRetrievalService(Configuration configuration)
 		throws Exception {
+		return createLeaderRetrievalService(configuration, false);
+	}
+
+	/**
+	 * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
+	 *
+	 * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
+	 * @param resolveInitialHostName If true, resolves the initial hostname
+	 * @return The {@link LeaderRetrievalService} specified in the configuration object
+	 * @throws Exception
+	 */
+	public static LeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration, boolean resolveInitialHostName)
+		throws Exception {
 
 		HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
 
 		switch (highAvailabilityMode) {
 			case NONE:
-				return StandaloneUtils.createLeaderRetrievalService(configuration);
+				return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName);
 			case ZOOKEEPER:
 				return ZooKeeperUtils.createLeaderRetrievalService(configuration);
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
index 8998add..8436ced 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/StandaloneUtils.java
@@ -22,11 +22,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.util.NetUtils;
 import scala.Option;
 import scala.Tuple3;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 /**
@@ -43,9 +43,24 @@ public final class StandaloneUtils {
 	 * @throws UnknownHostException
 	 */
 	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration)
+		Configuration configuration)
 		throws UnknownHostException {
-		return createLeaderRetrievalService(configuration, null);
+		return createLeaderRetrievalService(configuration, false);
+	}
+
+	/**
+	 * Creates a {@link StandaloneLeaderRetrievalService} from the given configuration. The
+	 * host and port for the remote Akka URL are retrieved from the provided configuration.
+	 *
+	 * @param configuration Configuration instance containing the host and port information
+	 * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
+	 * @return StandaloneLeaderRetrievalService
+	 * @throws UnknownHostException
+	 */
+	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
+			Configuration configuration, boolean resolveInitialHostName)
+		throws UnknownHostException {
+		return createLeaderRetrievalService(configuration, resolveInitialHostName, null);
 	}
 
 	/**
@@ -55,30 +70,35 @@ public final class StandaloneUtils {
 	 * for the remote Akka URL.
 	 *
 	 * @param configuration Configuration instance containing hte host and port information
+	 * @param resolveInitialHostName If true, resolves the hostname of the StandaloneLeaderRetrievalService
 	 * @param jobManagerName Name of the JobManager actor
 	 * @return StandaloneLeaderRetrievalService
 	 * @throws UnknownHostException if the host name cannot be resolved into an {@link InetAddress}
 	 */
 	public static StandaloneLeaderRetrievalService createLeaderRetrievalService(
 			Configuration configuration,
+			boolean resolveInitialHostName,
 			String jobManagerName)
 		throws UnknownHostException {
 
-
 		Tuple3<String, String, Object> stringIntPair = TaskManager.getAndCheckJobManagerAddress(configuration);
 
 		String protocol = stringIntPair._1();
 		String jobManagerHostname = stringIntPair._2();
 		int jobManagerPort = (Integer) stringIntPair._3();
-		InetSocketAddress hostPort;
 
-		try {
-			InetAddress inetAddress = InetAddress.getByName(jobManagerHostname);
-			hostPort = new InetSocketAddress(inetAddress, jobManagerPort);
-		}
-		catch (UnknownHostException e) {
-			throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
+		// Do not try to resolve a hostname to prevent resolving to the wrong IP address
+		String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(jobManagerHostname, jobManagerPort);
+
+		if (resolveInitialHostName) {
+			try {
+				//noinspection ResultOfMethodCallIgnored
+				InetAddress.getByName(jobManagerHostname);
+			}
+			catch (UnknownHostException e) {
+				throw new UnknownHostException("Cannot resolve the JobManager hostname '" + jobManagerHostname
 					+ "' specified in the configuration");
+			}
 		}
 
 		String jobManagerAkkaUrl = JobManager.getRemoteJobManagerAkkaURL(

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 6a23c39..c7bea66 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Callable, TimeUnit}
 
 import akka.actor._
 import akka.pattern.{ask => akkaAsk}
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
+import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
 import org.apache.flink.runtime.net.SSLUtils
@@ -63,8 +63,8 @@ object AkkaUtils {
    * will be instantiated.
    *
    * @param configuration instance containing the user provided configuration values
-   * @param listeningAddress an optional tuple containing a hostname and a port to bind to. If the
-   *                         parameter is None, then a local actor system will be created.
+   * @param listeningAddress an optional tuple containing a bindAddress and a port to bind to.
+    *                        If the parameter is None, then a local actor system will be created.
    * @return created actor system
    */
   def createActorSystem(
@@ -102,21 +102,25 @@ object AkkaUtils {
    * specified, then the actor system will listen on the respective address.
    *
    * @param configuration instance containing the user provided configuration values
-   * @param listeningAddress optional tuple of hostname and port to listen on. If None is given,
-   *                         then an Akka config for local actor system will be returned
+   * @param externalAddress optional tuple of bindAddress and port to be reachable at.
+   *                        If None is given, then an Akka config for local actor system
+   *                        will be returned
    * @return Akka config
    */
   @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
-                    listeningAddress: Option[(String, Int)]): Config = {
+                    externalAddress: Option[(String, Int)]): Config = {
     val defaultConfig = getBasicAkkaConfig(configuration)
 
-    listeningAddress match {
+    externalAddress match {
 
       case Some((hostname, port)) =>
-        val ipAddress = InetAddress.getByName(hostname)
-        val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\""
-        val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port)
+
+        val remoteConfig = getRemoteAkkaConfig(configuration,
+          // the wildcard IP lets us bind to all network interfaces
+          NetUtils.getWildcardIPAddress, port,
+          hostname, port)
+
         remoteConfig.withFallback(defaultConfig)
 
       case None =>
@@ -213,15 +217,19 @@ object AkkaUtils {
 
   /**
    * Creates a Akka config for a remote actor system listening on port on the network interface
-   * identified by hostname.
+   * identified by bindAddress.
    *
    * @param configuration instance containing the user provided configuration values
-   * @param hostname of the network interface to listen on
+   * @param bindAddress of the network interface to bind on
    * @param port to bind to or if 0 then Akka picks a free port automatically
+   * @param externalHostname The host name to expect for Akka messages
+   * @param externalPort The port to expect for Akka messages
    * @return Flink's Akka configuration for remote actor systems
    */
   private def getRemoteAkkaConfig(configuration: Configuration,
-                                  hostname: String, port: Int): Config = {
+                                  bindAddress: String, port: Int,
+                                  externalHostname: String, externalPort: Int): Config = {
+
     val akkaAskTimeout = Duration(configuration.getString(
       ConfigConstants.AKKA_ASK_TIMEOUT,
       ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT))
@@ -322,7 +330,8 @@ object AkkaUtils {
          |    netty {
          |      tcp {
          |        transport-class = "akka.remote.transport.netty.NettyTransport"
-         |        port = $port
+         |        port = $externalPort
+         |        bind-port = $port
          |        connection-timeout = $akkaTCPTimeout
          |        maximum-frame-size = $akkaFramesize
          |        tcp-nodelay = on
@@ -334,24 +343,29 @@ object AkkaUtils {
          |}
        """.stripMargin
 
-      val hostnameConfigString = if(hostname != null && hostname.nonEmpty){
-        s"""
-           |akka {
-           |  remote {
-           |    netty {
-           |      tcp {
-           |        hostname = $hostname
-           |      }
-           |    }
-           |  }
-           |}
-         """.stripMargin
-      }else{
-        // if hostname is null or empty, then leave hostname unspecified. Akka will pick
+    val effectiveHostname =
+      if (externalHostname != null && externalHostname.nonEmpty) {
+        externalHostname
+      } else {
+        // if bindAddress is null or empty, then leave bindAddress unspecified. Akka will pick
         // InetAddress.getLocalHost.getHostAddress
-        ""
+        "\"\""
       }
 
+    val hostnameConfigString =
+      s"""
+         |akka {
+         |  remote {
+         |    netty {
+         |      tcp {
+         |        hostname = $effectiveHostname
+         |        bind-hostname = $bindAddress
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin
+
     val sslConfigString = if (akkaEnableSSLConfig) {
       s"""
          |akka {

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8c686cd..c5682e2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.time.Time
-import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
@@ -149,7 +149,7 @@ class JobManager(
     case Some(registry) =>
       val host = flinkConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
       Option(new JobManagerMetricGroup(
-        registry, NetUtils.ipAddressToUrlString(InetAddress.getByName(host))))
+        registry, NetUtils.unresolvedHostToNormalizedString(host)))
     case None =>
       log.warn("Could not instantiate JobManager metrics.")
       None
@@ -1923,8 +1923,8 @@ object JobManager {
     // parsing the command line arguments
     val (configuration: Configuration,
          executionMode: JobManagerMode,
-         listeningHost: String,
-         listeningPortRange: java.util.Iterator[Integer]) =
+         externalHostName: String,
+         portRange: java.util.Iterator[Integer]) =
     try {
       parseArgs(args)
     }
@@ -1939,14 +1939,14 @@ object JobManager {
     // we want to check that the JobManager hostname is in the config
     // if it is not in there, the actor system will bind to the loopback interface's
     // address and will not be reachable from anyone remote
-    if (listeningHost == null) {
+    if (externalHostName == null) {
       val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
         "' is missing (hostname/address to bind JobManager to)."
       LOG.error(message)
       System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
 
-    if (!listeningPortRange.hasNext) {
+    if (!portRange.hasNext) {
       if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
         val message = "Config parameter '" + ConfigConstants.HA_JOB_MANAGER_PORT +
           "' does not specify a valid port range."
@@ -1954,7 +1954,7 @@ object JobManager {
         System.exit(STARTUP_FAILURE_RETURN_CODE)
       }
       else {
-        val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+        val message = s"Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
           "' does not specify a valid port."
         LOG.error(message)
         System.exit(STARTUP_FAILURE_RETURN_CODE)
@@ -1970,8 +1970,8 @@ object JobManager {
           runJobManager(
             configuration,
             executionMode,
-            listeningHost,
-            listeningPortRange)
+            externalHostName,
+            portRange)
         }
       })
     } catch {
@@ -2080,7 +2080,7 @@ object JobManager {
           override def createSocket(port: Int): ServerSocket = new ServerSocket(
             // Use the correct listening address, bound ports will only be
             // detected later by Akka.
-            port, 0, InetAddress.getByName(listeningAddress))
+            port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
         })
 
       val port =
@@ -2158,8 +2158,8 @@ object JobManager {
     * @param configuration The configuration object for the JobManager
     * @param executionMode The execution mode in which to run. Execution mode LOCAL with spawn an
     *                      additional TaskManager in the same process.
-    * @param listeningAddress The hostname where the JobManager should listen for messages.
-    * @param listeningPort The port where the JobManager should listen for messages
+    * @param externalHostname The hostname where the JobManager is reachable for rpc communication
+    * @param port The port where the JobManager is reachable for rpc communication
     * @param futureExecutor to run the JobManager's futures
     * @param ioExecutor to run blocking io operations
     * @param jobManagerClass The class of the JobManager to be started
@@ -2171,8 +2171,8 @@ object JobManager {
   def startActorSystemAndJobManagerActors(
       configuration: Configuration,
       executionMode: JobManagerMode,
-      listeningAddress: String,
-      listeningPort: Int,
+      externalHostname: String,
+      port: Int,
       futureExecutor: Executor,
       ioExecutor: Executor,
       jobManagerClass: Class[_ <: JobManager],
@@ -2180,16 +2180,15 @@ object JobManager {
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
     : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
 
-    LOG.info("Starting JobManager")
+    val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(externalHostname, port)
 
     // Bring up the job manager actor system first, bind it to the given address.
-    val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
-    LOG.info(s"Starting JobManager actor system at $hostPortUrl")
+    LOG.info(s"Starting JobManager actor system reachable at $hostPort")
 
     val jobManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(
         configuration,
-        Some((listeningAddress, listeningPort))
+        Some((externalHostname, port))
       )
       if (LOG.isDebugEnabled) {
         LOG.debug("Using akka configuration\n " + akkaConfig)
@@ -2201,8 +2200,7 @@ object JobManager {
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) {
-            val address = listeningAddress + ":" + listeningPort
-            throw new Exception("Unable to create JobManager at address " + address +
+            throw new Exception("Unable to create JobManager at address " + hostPort +
               " - " + cause.getMessage(), t)
           }
         }
@@ -2267,7 +2265,7 @@ object JobManager {
           configuration,
           ResourceID.generate(),
           jobManagerSystem,
-          listeningAddress,
+          externalHostname,
           Some(TaskManager.TASK_MANAGER_NAME),
           None,
           localTaskManagerCommunication = true,
@@ -2360,17 +2358,17 @@ object JobManager {
       }
     }
 
-    val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
+    val cliOptions = parser.parse(args, new JobManagerCliOptions()).getOrElse {
       throw new Exception(
         s"Invalid command line arguments: ${args.mkString(" ")}. Usage: ${parser.usage}")
     }
     
-    val configDir = config.getConfigDir()
+    val configDir = cliOptions.getConfigDir()
     
     if (configDir == null) {
       throw new Exception("Missing parameter '--configDir'")
     }
-    if (config.getJobManagerMode() == null) {
+    if (cliOptions.getJobManagerMode() == null) {
       throw new Exception("Missing parameter '--executionMode'")
     }
 
@@ -2391,12 +2389,12 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
     }
 
-    if (config.getWebUIPort() >= 0) {
-      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, config.getWebUIPort())
+    if (cliOptions.getWebUIPort() >= 0) {
+      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, cliOptions.getWebUIPort())
     }
 
-    if (config.getHost() != null) {
-      configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, config.getHost())
+    if (cliOptions.getHost() != null) {
+      configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, cliOptions.getHost())
     }
 
     val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
@@ -2429,10 +2427,9 @@ object JobManager {
         String.valueOf(listeningPort)
       }
 
-    val executionMode = config.getJobManagerMode
-    val hostUrl = NetUtils.ipAddressToUrlString(InetAddress.getByName(host))
+    val executionMode = cliOptions.getJobManagerMode
 
-    LOG.info(s"Starting JobManager on $hostUrl:$portRange with execution mode $executionMode")
+    LOG.info(s"Starting JobManager on $host:$portRange with execution mode $executionMode")
 
     val portRangeIterator = NetUtils.getPortRangeFromString(portRange)
 
@@ -2735,20 +2732,18 @@ object JobManager {
    * where the JobManager's actor system runs.
    *
    * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
-   * @param address The address of the JobManager's actor system.
+   * @param hostPort The external address of the JobManager's actor system in format host:port
    * @return The akka URL of the JobManager actor.
    */
   def getRemoteJobManagerAkkaURL(
       protocol: String,
-      address: InetSocketAddress,
+      hostPort: String,
       name: Option[String] = None)
     : String = {
 
     require(protocol == "akka.tcp" || protocol == "akka.ssl.tcp",
         "protocol field should be either akka.tcp or akka.ssl.tcp")
 
-    val hostPort = NetUtils.socketAddressToUrlString(address)
-
     getJobManagerAkkaURLHelper(s"$protocol://flink@$hostPort", name)
   }
 
@@ -2761,17 +2756,7 @@ object JobManager {
   def getRemoteJobManagerAkkaURL(config: Configuration) : String = {
     val (protocol, hostname, port) = TaskManager.getAndCheckJobManagerAddress(config)
 
-    var hostPort: InetSocketAddress = null
-
-    try {
-      val inetAddress: InetAddress = InetAddress.getByName(hostname)
-      hostPort = new InetSocketAddress(inetAddress, port)
-    }
-    catch {
-      case e: UnknownHostException =>
-        throw new UnknownHostException(s"Cannot resolve the JobManager hostname '$hostname' " +
-          s"specified in the configuration")
-    }
+    val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)
 
     JobManager.getRemoteJobManagerAkkaURL(protocol, hostPort, Option.empty)
   }
@@ -2794,15 +2779,6 @@ object JobManager {
     address + "/user/" + name.getOrElse(JOB_MANAGER_NAME)
   }
 
-  def getJobManagerActorRefFuture(
-      protocol: String,
-      address: InetSocketAddress,
-      system: ActorSystem,
-      timeout: FiniteDuration)
-    : Future[ActorRef] = {
-    AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(protocol, address), system, timeout)
-  }
-
   /**
    * Resolves the JobManager actor reference in a blocking fashion.
    *
@@ -2825,7 +2801,7 @@ object JobManager {
    * Resolves the JobManager actor reference in a blocking fashion.
    *
    * @param protocol The protocol to be used to connect to the remote JobManager's actor system.
-   * @param address The socket address of the JobManager's actor system.
+   * @param hostPort The external address of the JobManager's actor system in format host:port.
    * @param system The local actor system that should perform the lookup.
    * @param timeout The maximum time to wait until the lookup fails.
    * @throws java.io.IOException Thrown, if the lookup fails.
@@ -2834,19 +2810,19 @@ object JobManager {
   @throws(classOf[IOException])
   def getJobManagerActorRef(
       protocol: String,
-      address: InetSocketAddress,
+      hostPort: String,
       system: ActorSystem,
       timeout: FiniteDuration)
     : ActorRef = {
 
-    val jmAddress = getRemoteJobManagerAkkaURL(protocol, address)
+    val jmAddress = getRemoteJobManagerAkkaURL(protocol, hostPort)
     getJobManagerActorRef(jmAddress, system, timeout)
   }
 
   /**
    * Resolves the JobManager actor reference in a blocking fashion.
    *
-   * @param address The socket address of the JobManager's actor system.
+   * @param hostPort The address of the JobManager's actor system in format host:port.
    * @param system The local actor system that should perform the lookup.
    * @param config The config describing the maximum time to wait until the lookup fails.
    * @throws java.io.IOException Thrown, if the lookup fails.
@@ -2854,13 +2830,13 @@ object JobManager {
    */
   @throws(classOf[IOException])
   def getJobManagerActorRef(
-      address: InetSocketAddress,
+      hostPort: String,
       system: ActorSystem,
       config: Configuration)
     : ActorRef = {
 
     val timeout = AkkaUtils.getLookupTimeout(config)
     val protocol = AkkaUtils.getAkkaProtocol(config)
-    getJobManagerActorRef(protocol, address, system, timeout)
+    getJobManagerActorRef(protocol, hostPort, system, timeout)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index dc59048..88d7b3a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
   // not getLocalHost(), which may be 127.0.1.1
   val hostname = userConfiguration.getString(
     ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
-    InetAddress.getByName("localhost").getHostAddress())
+    "localhost")
 
   protected val originalConfiguration = generateConfiguration(userConfiguration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 41d3077..ec5ff3d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2089,9 +2089,12 @@ object TaskManager {
 
     val leaderRetrievalService = leaderRetrievalServiceOption match {
       case Some(lrs) => lrs
-      case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+      case None =>
+        // validate the address if possible (e.g. we're in Standalone mode)
+        LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true)
     }
 
+
     val metricsRegistry = new FlinkMetricRegistry(
       MetricRegistryConfiguration.fromConfiguration(configuration))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index fbe6e8f..49c28e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -29,6 +29,7 @@ import akka.actor.PoisonPill;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 
 import org.apache.flink.configuration.Configuration;
@@ -40,7 +41,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
-import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -119,7 +119,7 @@ public class JobManagerProcessReapingTest {
 				try {
 					jobManagerRef = JobManager.getJobManagerActorRef(
 						"akka.tcp",
-						new InetSocketAddress("localhost", jobManagerPort),
+						NetUtils.unresolvedHostAndPortToNormalizedString("localhost", jobManagerPort),
 						localSystem, new FiniteDuration(25, TimeUnit.SECONDS));
 				} catch (Throwable t) {
 					// job manager probably not ready yet

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 58761cb..0ab1b67 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -82,7 +82,7 @@ public class JobManagerStartupTest {
 		
 		try {
 			portNum = NetUtils.getAvailablePort();
-			portOccupier = new ServerSocket(portNum, 10, InetAddress.getByName("localhost"));
+			portOccupier = new ServerSocket(portNum, 10, InetAddress.getByName("0.0.0.0"));
 		}
 		catch (Throwable t) {
 			// could not find free port, or open a connection there
@@ -95,7 +95,7 @@ public class JobManagerStartupTest {
 		}
 		catch (Exception e) {
 			// expected
-			List<Throwable> causes = StartupUtils.getExceptionCauses(e,new ArrayList<Throwable>());
+			List<Throwable> causes = StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>());
 			for(Throwable cause:causes) {
 				if(cause instanceof BindException) {
 					throw (BindException) cause;

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 8b8987b..6a8ff17 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -103,10 +104,10 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			client[0] = ZooKeeperUtils.startCuratorFramework(config);
 			client[1] = ZooKeeperUtils.startCuratorFramework(config);
 
-			InetSocketAddress wrongInetSocketAddress = new InetSocketAddress(InetAddress.getByName("1.1.1.1"), 1234);
+			String wrongHostPort = NetUtils.unresolvedHostAndPortToNormalizedString("1.1.1.1", 1234);
 
 			String wrongAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
-					wrongInetSocketAddress, Option.<String>empty());
+					wrongHostPort, Option.<String>empty());
 
 			try {
 				localHost = InetAddress.getLocalHost();
@@ -123,9 +124,10 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
 			}
 
 			InetSocketAddress correctInetSocketAddress = new InetSocketAddress(localHost, serverSocket.getLocalPort());
+			String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), correctInetSocketAddress.getPort());
 
 			String correctAddress = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(config),
-					correctInetSocketAddress, Option.<String>empty());
+				hostPort, Option.<String>empty());
 
 			faultyLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(client[0], config);
 			TestingContender wrongLeaderAddressContender = new TestingContender(wrongAddress, faultyLeaderElectionService);

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index dead732..385d1ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -40,7 +40,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
-import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
@@ -91,11 +90,10 @@ public abstract class TaskManagerProcessReapingTestBase {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
-			final InetAddress localhost = InetAddress.getByName("localhost");
 			final int jobManagerPort = NetUtils.getAvailablePort();
 
 			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(), jobManagerPort);
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", jobManagerPort);
 			jmActorSystem = AkkaUtils.createActorSystem(
 					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
 
@@ -137,7 +135,7 @@ public abstract class TaskManagerProcessReapingTestBase {
 			// grab the reference to the TaskManager. try multiple times, until the process
 			// is started and the TaskManager is up
 			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
-					org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+					"localhost:" + taskManagerPort,
 					TaskManager.TASK_MANAGER_NAME());
 
 			ActorRef taskManagerRef = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 686de76..b2a905d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.StartupUtils;
+import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 import scala.Option;
 
@@ -57,10 +58,10 @@ public class TaskManagerStartupTest {
 		ServerSocket blocker = null;
 		try {
 			final String localHostName = "localhost";
-			final InetAddress localAddress = InetAddress.getByName(localHostName);
+			final InetAddress localBindAddress = InetAddress.getByName(NetUtils.getWildcardIPAddress());
 
 			// block some port
-			blocker = new ServerSocket(0, 50, localAddress);
+			blocker = new ServerSocket(0, 50, localBindAddress);
 			final int port = blocker.getLocalPort();
 
 			TaskManager.runTaskManager(localHostName, ResourceID.generate(), port, new Configuration(),
@@ -69,7 +70,7 @@ public class TaskManagerStartupTest {
 
 		}
 		catch (IOException e) {
-			// expected. validate the error messagex
+			// expected. validate the error message
 			List<Throwable> causes = StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>());
 			for (Throwable cause : causes) {
 				if (cause instanceof BindException) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 387b0fd..48c65c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -25,13 +25,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.JobManagerMode;
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -148,7 +148,7 @@ public class JobManagerProcess extends TestJvmProcess {
 
 		return JobManager.getRemoteJobManagerAkkaURL(
 				AkkaUtils.getAkkaProtocol(config),
-				new InetSocketAddress("localhost", port),
+				NetUtils.unresolvedHostAndPortToNormalizedString("localhost", port),
 				Option.<String>empty());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index a18024f..0daac2e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.akka
 import java.net.InetSocketAddress
 
 import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.util.NetUtils
 import org.junit.runner.RunWith
-import org.scalatest.{FunSuite, BeforeAndAfterAll, Matchers}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 import org.scalatest.junit.JUnitRunner
 
 @RunWith(classOf[JUnitRunner])
@@ -39,7 +40,7 @@ class AkkaUtilsTest
 
     val remoteAkkaURL = JobManager.getRemoteJobManagerAkkaURL(
       "akka.tcp",
-      address,
+      NetUtils.unresolvedHostAndPortToNormalizedString(host, port),
       Some("actor"))
 
     val result = AkkaUtils.getInetSockeAddressFromAkkaURL(remoteAkkaURL)

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
index 6013309..1489fb2 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerConnectionTest.scala
@@ -53,7 +53,7 @@ class JobManagerConnectionTest {
         case _ : Throwable => return
       }
 
-      val endpoint = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), freePort)
+      val endpoint = NetUtils.unresolvedHostAndPortToNormalizedString("127.0.0.1", freePort)
       val config = createConfigWithLowTimeout()
 
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {
@@ -89,7 +89,7 @@ class JobManagerConnectionTest {
 
     try {
       // some address that is not running a JobManager
-      val endpoint = new InetSocketAddress(InetAddress.getByName("10.254.254.254"), 2)
+      val endpoint = NetUtils.unresolvedHostAndPortToNormalizedString("10.254.254.254", 2)
       val config = createConfigWithLowTimeout()
 
       mustReturnWithinTimeout(Duration(5*timeout, TimeUnit.MILLISECONDS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index efc95ab..7929e27 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -168,8 +168,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 		</dependency>
 		
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 3c2bc67..6baf5a8 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -99,8 +99,8 @@ under the License.
 		</dependency>
 		
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 72a7122..d959e14 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -90,7 +90,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 	/**
 	 * Tests that the application master can be killed multiple times and that the surviving
-	 * TaskManager succesfully reconnects to the newly started JobManager.
+	 * TaskManager successfully reconnects to the newly started JobManager.
 	 * @throws Exception
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 9a3cc8e..39d9379 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -65,18 +65,18 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-actor_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-remote_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-camel_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-camel_${scala.binary.version}</artifactId>
 		</dependency>
 
 		<dependency>
@@ -86,8 +86,8 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<groupId>com.typesafe.akka</groupId>
-			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<groupId>com.data-artisans</groupId>
+			<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 			<scope>test</scope>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 670f8a2..5606719 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -625,7 +625,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			}
 			//------------------ ClusterClient deployed, handle connection details
 			String jobManagerAddress =
-				yarnCluster.getJobManagerAddress().getAddress().getHostAddress() +
+				yarnCluster.getJobManagerAddress().getAddress().getHostName() +
 					":" + yarnCluster.getJobManagerAddress().getPort();
 
 			System.out.println("Flink JobManager is now running on " + jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/27ebdf7a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 04ba726..0842517 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,7 +93,7 @@ under the License.
 		<log4j.configuration>log4j-test.properties</log4j.configuration>
 		<slf4j.version>1.7.7</slf4j.version>
 		<guava.version>18.0</guava.version>
-		<akka.version>2.3.7</akka.version>
+		<akka.version>2.3-custom</akka.version>
 		<java.version>1.7</java.version>
 		<scala.macros.version>2.0.1</scala.macros.version>
 		<!-- Default scala versions, may be overwritten by build profiles -->
@@ -334,32 +334,32 @@ under the License.
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-actor_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-actor_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-remote_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-remote_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-slf4j_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-camel_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-camel_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 			</dependency>
 
 			<dependency>
-				<groupId>com.typesafe.akka</groupId>
-				<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+				<groupId>com.data-artisans</groupId>
+				<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 				<version>${akka.version}</version>
 				<scope>test</scope>
 			</dependency>
@@ -499,8 +499,8 @@ under the License.
 							com.sun.tools.javac.code.Symbol$CompletionFailure:
 								class file for akka.testkit.TestKit not found"
 					-->
-					<groupId>com.typesafe.akka</groupId>
-					<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+					<groupId>com.data-artisans</groupId>
+					<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 					<version>${akka.version}</version>
 					<scope>provided</scope>
 				</dependency>
@@ -627,8 +627,8 @@ under the License.
 							com.sun.tools.javac.code.Symbol$CompletionFailure:
 								class file for akka.testkit.TestKit not found"
 					-->
-					<groupId>com.typesafe.akka</groupId>
-					<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+					<groupId>com.data-artisans</groupId>
+					<artifactId>flakka-testkit_${scala.binary.version}</artifactId>
 					<version>${akka.version}</version>
 					<scope>provided</scope>
 				</dependency>