You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/31 12:31:45 UTC

[09/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 355da2d..dc242b2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -452,6 +452,31 @@ public final class ConfigConstants {
 	
 	public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
+  	// --------------------------- Recovery -----------------------------------
+
+	/** Defines recovery mode used for the cluster execution ("standalone", "zookeeper") */
+	public static final String RECOVERY_MODE = "recovery.mode";
+
+  	// --------------------------- ZooKeeper ----------------------------------
+
+	/** ZooKeeper servers. */
+	public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
+
+	/** ZooKeeper root path. */
+	public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+
+	public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
+
+	public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+
+	public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
+
+	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
+
+	public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
+
+	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
+
 	// ------------------------------------------------------------------------
 	//                            Default Values
 	// ------------------------------------------------------------------------
@@ -694,33 +719,23 @@ public final class ConfigConstants {
 	/**
 	 * Sets the number of local task managers
 	 */
-	public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = "localinstancemanager.numtaskmanager";
-
-	public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = "localinstancemanager.start-webserver";
-
-	// --------------------------- ZooKeeper ----------------------------------
-
-	/** ZooKeeper servers. */
-	public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
-
-	/** ZooKeeper root path. */
-	public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+	public static final String LOCAL_NUMBER_TASK_MANAGER = "local.number-taskmanager";
 
-	public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
+	public static final int DEFAULT_LOCAL_NUMBER_TASK_MANAGER = 1;
 
-	public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+	public static final String LOCAL_NUMBER_JOB_MANAGER = "local.number-jobmanager";
 
-	public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
+	public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
 
-	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
+	public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
 
-	public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
+  	// --------------------------- Recovery ---------------------------------
 
-	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
+	public static String DEFAULT_RECOVERY_MODE = "standalone";
 
-	// - Defaults -------------------------------------------------------------
+	// --------------------------- ZooKeeper ----------------------------------
 
-	public static final String DEFAULT_ZOOKEEPER_ZNODE_ROOT = "/flink";
+	public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
 
 	public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index 3676e62..dac5144 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -22,10 +22,6 @@ package org.apache.flink.types.parser;
 import org.apache.flink.types.parser.ByteParser;
 import org.apache.flink.types.parser.FieldParser;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 
 public class ByteParserTest extends ParserTestBase<Byte> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
index 8de3b94..8134a68 100644
--- a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
@@ -25,8 +25,6 @@ import static org.junit.Assert.fail;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-
 /**
  * This class contains tests for the {@link org.apache.flink.util.AbstractID} class.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index e79ff71..9df6da5 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -278,6 +278,10 @@ under the Apache License (v 2.0):
  - Jansi (org.fusesource.jansi:jansi:1.4 - https://github.com/fusesource/jansi)
  - Apache Camel Core (org.apache.camel:camel-core:2.10.3 - http://camel.apache.org/camel-core.html)
  - Apache Commons Math (org.apache.commons:commons-math3:3.5 - http://commons.apache.org/proper/commons-math/index.html)
+ - Apache ZooKeeper (org.apache.zookeeper:zookeeper:3.4.6 - https://zookeeper.apache.org/)
+ - Apache Curator (org.apache.curator:curator-recipes:2.8.0 - http://curator.apache.org/)
+ - Apache Curator (org.apache.curator:curator-framework:2.8.0 - http://curator.apache.org/)
+ - Apache Curator (org.apache.curator:curator-client:2.8.0 - http://curator.apache.org/)
 
 
 -----------------------------------------------------------------------
@@ -356,6 +360,7 @@ BSD-style licenses:
  - ASM (org.ow2.asm:asm:5.0.4 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
  - Grizzled SLF4J (org.clapper:grizzled-slf4j_2.10:1.0.2 - http://software.clapper.org/grizzled-slf4j/) - Copyright (c) 2010 Brian M. Clapper
  - ParaNamer (com.thoughtworks.paranamer:paranamer:2.3 - https://github.com/paul-hammant/paranamer) - Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
+ - JLine (jline:jline:0.9.94 - http://jline.sourceforge.net/) - Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
 
 
 (Below is the 3-clause BSD license)

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/NOTICE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/NOTICE b/flink-dist/src/main/flink-bin/NOTICE
index 7b0fe72..19af75f 100644
--- a/flink-dist/src/main/flink-bin/NOTICE
+++ b/flink-dist/src/main/flink-bin/NOTICE
@@ -52,6 +52,28 @@ available from http://www.digip.org/jansson/.
 
 
 -----------------------------------------------------------------------
+                           Apache ZooKeeper
+-----------------------------------------------------------------------
+
+Apache ZooKeeper
+Copyright 2009-2014 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+-----------------------------------------------------------------------
+                           Apache Curator
+-----------------------------------------------------------------------
+
+Apache Curator
+Copyright 2013-2014 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+-----------------------------------------------------------------------
                            Apache Sling
 -----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 1d31493..04afa02 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -94,7 +94,7 @@ KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_JAVA_HOME="env.java.home"
 KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
-KEY_ZK_QUORUM="ha.zookeeper.quorum"
+KEY_RECOVERY_MODE="recovery.mode"
 KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
 ########################################################################################################################
@@ -205,8 +205,8 @@ if [ -z "${ZK_HEAP}" ]; then
     ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
 fi
 
-if [ -z "${ZK_QUORUM}" ]; then
-    ZK_QUORUM=$(readFromConfig ${KEY_ZK_QUORUM} "" "${YAML_CONF}")
+if [ -z "${RECOVERY_MODE}" ]; then
+    RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
 fi
 
 # Arguments for the JVM. Used for job and task manager JVMs.
@@ -268,13 +268,18 @@ readMasters() {
     fi
 
     MASTERS=()
+    WEBUIPORTS=()
 
     GOON=true
     while $GOON; do
         read line || GOON=false
-        HOST=$( extractHostName $line)
-        if [ -n "$HOST" ]; then
+        HOSTWEBUIPORT=$( extractHostName $line)
+
+        if [ -n "$HOSTWEBUIPORT" ]; then
+            HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
+            WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -f2 -d:)
             MASTERS+=(${HOST})
+            WEBUIPORTS+=(${WEBUIPORT})
         fi
     done < "$MASTERS_FILE"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 450d36b..c89f53a 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -18,12 +18,13 @@
 ################################################################################
 
 # Start/stop a Flink JobManager.
-USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] [host])|stop|stop-all)"
+USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] [host] [webui-port])|stop|stop-all)"
 
 STARTSTOP=$1
 EXECUTIONMODE=$2
 STREAMINGMODE=$3
 HOST=$4 # optional when starting multiple instances
+WEBUIPORT=$5 # optinal when starting multiple instances
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -65,6 +66,10 @@ if [[ $STARTSTOP == "start" ]]; then
     if [ ! -z $HOST ]; then
         args="${args} --host $HOST"
     fi
+
+    if [ ! -z $WEBUIPORT ]; then
+        args="${args} --webui-port $WEBUIPORT"
+    fi
 fi
 
 ${bin}/flink-daemon.sh $STARTSTOP jobmanager "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 429285e..1fdd885 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -32,21 +32,26 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # Start the JobManager instance(s)
-if [[ -z $ZK_QUORUM ]]; then
-    echo "Starting cluster (${STREAMING_MODE} mode)."
-
-    # Start single JobManager on this machine
-    "$bin"/jobmanager.sh start cluster ${STREAMING_MODE}
-else
+shopt -s nocasematch
+if [[ $RECOVERY_MODE == "zookeeper" ]]; then
     # HA Mode
     readMasters
 
     echo "Starting HA cluster (${STREAMING_MODE} mode) with ${#MASTERS[@]} masters and ${#ZK_QUORUM[@]} peers in ZooKeeper quorum."
 
-    for master in ${MASTERS[@]}; do
-        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $FLINK_BIN_DIR/jobmanager.sh start cluster ${STREAMING_MODE} ${master} &"
+    for ((i=0;i<${#MASTERS[@]};++i)); do
+        master=${MASTERS[i]}
+        webuiport=${WEBUIPORTS[i]}
+        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $FLINK_BIN_DIR/jobmanager.sh start cluster ${STREAMING_MODE} ${master} ${webuiport} &"
     done
+
+else
+    echo "Starting cluster (${STREAMING_MODE} mode)."
+
+    # Start single JobManager on this machine
+    "$bin"/jobmanager.sh start cluster ${STREAMING_MODE}
 fi
+shopt -u nocasematch
 
 # Start TaskManager instance(s)
 readSlaves

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index 17a5daf..d6dd572 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -30,13 +30,16 @@ for slave in ${SLAVES[@]}; do
 done
 
 # Stop JobManager instance(s)
-if [[ -z $ZK_QUORUM ]]; then
-    "$bin"/jobmanager.sh stop
-else
-	# HA Mode
+shopt -s nocasematch
+if [[ $RECOVERY_MODE == "zookeeper" ]]; then
+    # HA Mode
     readMasters
 
     for master in ${MASTERS[@]}; do
         ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $bin/jobmanager.sh stop &"
     done
+
+else
+	  "$bin"/jobmanager.sh stop
 fi
+shopt -u nocasematch

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index a017f3a..4efb7ad 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -30,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.WeakHashMap;
 
 /**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
  * <p>
  * The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
  * at some point once no one else is pointing to the ExecutionGraph.
@@ -38,27 +40,34 @@ import java.util.WeakHashMap;
  * stay valid.
  */
 public class ExecutionGraphHolder {
-	
-	private final ActorGateway source;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
+
+	/** Retrieves the current leading JobManager and its corresponding archive */
+	private final JobManagerArchiveRetriever retriever;
 	
 	private final FiniteDuration timeout;
 	
 	private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
-	
-	
-	public ExecutionGraphHolder(ActorGateway source) {
-		this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+
+	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever) {
+		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 
-	public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
-		if (source == null || timeout == null) {
+	public ExecutionGraphHolder(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.source = source;
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
-	
-	
+
+	/**
+	 * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+	 *
+	 * @param jid jobID of the execution graph to be retrieved
+	 * @return the retrieved execution graph or null if it is not retrievable
+	 */
 	public ExecutionGraph getExecutionGraph(JobID jid) {
 		ExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
@@ -66,19 +75,25 @@ public class ExecutionGraphHolder {
 		}
 		
 		try {
-			Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
-			Object result = Await.result(future, timeout);
-			if (result instanceof JobManagerMessages.JobNotFound) {
+			ActorGateway jobManager = retriever.getJobManagerGateway();
+
+			if (jobManager != null) {
+
+				Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
+				Object result = Await.result(future, timeout);
+				if (result instanceof JobManagerMessages.JobNotFound) {
+					return null;
+				} else if (result instanceof JobManagerMessages.JobFound) {
+					ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+					cache.put(jid, eg);
+					return eg;
+				} else {
+					throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+				}
+			} else {
+				LOG.warn("No connection to the leading JobManager.");
 				return null;
 			}
-			else if (result instanceof JobManagerMessages.JobFound) {
-				ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
-				cache.put(jid, eg);
-				return eg;
-			}
-			else {
-				throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
-			}
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Error requesting execution graph", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
new file mode 100644
index 0000000..91c9ad5
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Retrieves and stores the actor gateway to the current leading JobManager and its archive. In
+ * case of an error, the {@link WebRuntimeMonitor} to which this instance is associated will be
+ * stopped.
+ */
+public class JobManagerArchiveRetriever implements LeaderRetrievalListener {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerArchiveRetriever.class);
+
+	private final ActorSystem actorSystem;
+	private final FiniteDuration lookupTimeout;
+	private final FiniteDuration timeout;
+	private final WebMonitor webMonitor;
+
+	/** will be written and read concurrently */
+	private volatile ActorGateway jobManagerGateway;
+	private volatile ActorGateway archiveGateway;
+
+	public JobManagerArchiveRetriever(
+			WebMonitor webMonitor,
+			ActorSystem actorSystem,
+			FiniteDuration lookupTimeout,
+			FiniteDuration timeout) {
+		this.webMonitor = webMonitor;
+		this.actorSystem = actorSystem;
+		this.lookupTimeout = lookupTimeout;
+		this.timeout = timeout;
+	}
+
+	public ActorGateway getJobManagerGateway() {
+		return jobManagerGateway;
+	}
+
+	public ActorGateway getArchiveGateway() {
+		return archiveGateway;
+	}
+
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		if (leaderAddress != null && !leaderAddress.equals("")) {
+			try {
+				ActorRef jobManager = AkkaUtils.getActorRef(
+						leaderAddress,
+						actorSystem,
+						lookupTimeout);
+				jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+				Future<Object> archiveFuture = jobManagerGateway.ask(
+						JobManagerMessages.getRequestArchive(),
+						timeout);
+
+				ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
+						archiveFuture,
+						timeout)
+				).actor();
+
+				archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
+			} catch (Exception e) {
+				handleError(e);
+			}
+		}
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		LOG.error("Received error from LeaderRetrievalService.", exception);
+
+		try{
+			// stop associated webMonitor
+			webMonitor.stop();
+		} catch (Exception e) {
+			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 006d18d..4c38cae 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor;
 
+import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -33,7 +35,8 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
@@ -67,7 +70,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 	public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
 	
 	public static final long DEFAULT_REFRESH_INTERVAL = 5000;
-	
+
 	/** Logger for web frontend startup / shutdown messages */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
 	
@@ -77,22 +80,36 @@ public class WebRuntimeMonitor implements WebMonitor {
 	// ------------------------------------------------------------------------
 	
 	private final Object startupShutdownLock = new Object();
-	
-	private final Router router;
+
+	private final LeaderRetrievalService leaderRetrievalService;
+
+	/** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
+	private final JobManagerArchiveRetriever retriever;
 
 	private final int configuredPort;
 
+	private final Router router;
+
 	private ServerBootstrap bootstrap;
 	
 	private Channel serverChannel;
 
-	
-	public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
+	// ------------------------------------------------------------------------
+
+	public WebRuntimeMonitor(
+			Configuration config,
+			LeaderRetrievalService leaderRetrievalService,
+			ActorSystem actorSystem)
+		throws IOException {
+		Preconditions.checkNotNull(config);
+		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
+
 		// figure out where our static contents is
 		final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
 		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-		
+
 		final File webRootDir;
+		
 		if (configuredWebRoot != null) {
 			webRootDir = new File(configuredWebRoot);
 		}
@@ -113,22 +130,27 @@ public class WebRuntimeMonitor implements WebMonitor {
 		
 		// port configuration
 		this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-												ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
 		if (this.configuredPort < 0) {
 			throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
 		}
-		
-		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
-		
+
+		FiniteDuration timeout = AkkaUtils.getTimeout(config);
+		FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
+
+		retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
+
+		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
+
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new RequestConfigHandler(DEFAULT_REFRESH_INTERVAL)))
-			
+
 			// the overview - how many task managers, slots, free slots, ...
-			.GET("/overview", handler(new RequestOverviewHandler(jobManager)))
+			.GET("/overview", handler(new RequestOverviewHandler(retriever)))
 
 			// currently running jobs
-			.GET("/jobs", handler(new RequestJobIdsHandler(jobManager)))
+			.GET("/jobs", handler(new RequestJobIdsHandler(retriever)))
 			.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
 			.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
@@ -137,12 +159,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 //			.GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
 
 			// the handler for the legacy requests
-			.GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
-					
+			.GET("/jobsInfo", new JobManagerInfoHandler(retriever, DEFAULT_REQUEST_TIMEOUT))
+
 			// this handler serves all the static contents
 			.GET("/:*", new StaticFileServerHandler(webRootDir));
-
-		
 	}
 
 	@Override
@@ -151,13 +171,13 @@ public class WebRuntimeMonitor implements WebMonitor {
 			if (this.bootstrap != null) {
 				throw new IllegalStateException("The server has already been started");
 			}
-			
+
+			final Handler handler = new Handler(router);
+
 			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
-	
+
 				@Override
 				protected void initChannel(SocketChannel ch) {
-					Handler handler = new Handler(router);
-					
 					ch.pipeline()
 						.addLast(new HttpServerCodec())
 						.addLast(new HttpObjectAggregator(65536))
@@ -168,7 +188,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			
 			NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
 			NioEventLoopGroup workerGroup = new NioEventLoopGroup();
-	
+
 			this.bootstrap = new ServerBootstrap();
 			this.bootstrap
 					.group(bossGroup, workerGroup)
@@ -177,18 +197,22 @@ public class WebRuntimeMonitor implements WebMonitor {
 	
 			Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
 			this.serverChannel = ch;
-			
+
 			InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
 			String address = bindAddress.getAddress().getHostAddress();
 			int port = bindAddress.getPort();
 			
 			LOG.info("Web frontend listening at " + address + ':' + port);
+
+			leaderRetrievalService.start(retriever);
 		}
 	}
 	
 	@Override
 	public void stop() throws Exception {
 		synchronized (startupShutdownLock) {
+			leaderRetrievalService.stop();
+
 			if (this.serverChannel != null) {
 				this.serverChannel.close().awaitUninterruptibly();
 				this.serverChannel = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index aa1a39f..8a177f4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 
@@ -37,19 +38,19 @@ import java.util.Map;
  */
 public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
 	
-	private final ActorGateway target;
+	private final JobManagerArchiveRetriever retriever;
 	
 	private final FiniteDuration timeout;
 	
-	public RequestJobIdsHandler(ActorGateway target) {
-		this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	public RequestJobIdsHandler(JobManagerArchiveRetriever retriever) {
+		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 	
-	public RequestJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
-		if (target == null || timeout == null) {
+	public RequestJobIdsHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.target = target;
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
 	
@@ -57,9 +58,15 @@ public class RequestJobIdsHandler implements RequestHandler, RequestHandler.Json
 	public String handleRequest(Map<String, String> params) throws Exception {
 		// we need no parameters, get all requests
 		try {
-			Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
-			JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
-			return JsonFactory.generateJobsOverviewJSON(result);
+			ActorGateway jobManager = retriever.getJobManagerGateway();
+
+			if (jobManager != null) {
+				Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
+				JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
+				return JsonFactory.generateJobsOverviewJSON(result);
+			} else {
+				throw new Exception("No connection to the leading JobManager.");
+			}
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index c2c00c7..ce30122 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import org.apache.flink.runtime.webmonitor.JsonFactory;
 import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 
@@ -36,29 +37,35 @@ import java.util.Map;
  */
 public class RequestOverviewHandler implements  RequestHandler, RequestHandler.JsonResponse {
 	
-	private final ActorGateway jobManager;
+	private final JobManagerArchiveRetriever retriever;
 	
 	private final FiniteDuration timeout;
 	
 	
-	public RequestOverviewHandler(ActorGateway jobManager) {
-		this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+	public RequestOverviewHandler(JobManagerArchiveRetriever retriever) {
+		this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
 	}
 	
-	public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
-		if (jobManager == null || timeout == null) {
+	public RequestOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		if (retriever == null || timeout == null) {
 			throw new NullPointerException();
 		}
-		this.jobManager = jobManager;
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
 	
 	@Override
 	public String handleRequest(Map<String, String> params) throws Exception {
 		try {
-			Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
-			StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
-			return JsonFactory.generateOverviewWithJobIDsJSON(result);
+			ActorGateway jobManager = retriever.getJobManagerGateway();
+
+			if (jobManager != null) {
+				Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
+				StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
+				return JsonFactory.generateOverviewWithJobIDsJSON(result);
+			} else {
+				throw new Exception("No connection to the leading job manager.");
+			}
 		}
 		catch (Exception e) {
 			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
index 9b52736..3f1842b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneou
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.util.StringUtils;
@@ -75,14 +76,15 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
 	/** Underlying JobManager */
-	private final ActorGateway jobmanager;
-	private final ActorGateway archive;
+	private final JobManagerArchiveRetriever retriever;
 	private final FiniteDuration timeout;
 
+	private ActorGateway jobmanager;
+	private ActorGateway archive;
 
-	public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
-		this.jobmanager = jobmanager;
-		this.archive = archive;
+
+	public JobManagerInfoHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+		this.retriever = retriever;
 		this.timeout = timeout;
 	}
 
@@ -90,6 +92,18 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 	protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
 		DefaultFullHttpResponse response;
 		try {
+			jobmanager = retriever.getJobManagerGateway();
+
+			if (jobmanager == null) {
+				throw new Exception("No connection to leading JobManager.");
+			}
+
+			archive = retriever.getArchiveGateway();
+
+			if (archive == null) {
+				throw new Exception("No connection to leading JobManager.");
+			}
+
 			String result = handleRequest(routed);
 			byte[] bytes = result.getBytes(ENCODING);
 
@@ -114,6 +128,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
 	
 	@SuppressWarnings("unchecked")
 	private String handleRequest(Routed routed) throws Exception {
+
+
 		if ("archive".equals(routed.queryParam("get"))) {
 			Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
index 9a9b6ba..a2978a1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
@@ -45,14 +45,15 @@ public class TestRunner {
 
 		// start the cluster with the runtime monitor
 		Configuration configuration = new Configuration();
-		configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
+		configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 		configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
 		configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
-			"/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
+			"flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
 		
 		LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
+		cluster.start();
 
-		final int port = cluster.getJobManagerRPCPort();
+		final int port = cluster.getLeaderRPCPort();
 		runWordCount(port);
 		runWebLogAnalysisExample(port);
 		runWordCount(port);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/log4j-test.properties b/flink-runtime-web/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2b9292a
--- /dev/null
+++ b/flink-runtime-web/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/{$mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/logback-test.xml b/flink-runtime-web/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..9d4f644
--- /dev/null
+++ b/flink-runtime-web/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] [%X{sourceThread} - %X{akkaSource}] %-5level %logger{60} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+    <!-- The following loggers are disabled during tests, because many tests deliberately
+         throw error to test failing scenarios. Logging those would overflow the log. -->
+         <!---->
+    <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+    <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
+    <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
+    <logger name="org.apache.flink.runtime.testingUtils" level="OFF"/>
+    <logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
+    <logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
+    <logger name="org.apache.flink.runtime.instance.InstanceManager" level="OFF"/>
+</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 6dc8d42..c5706a1 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -196,6 +196,12 @@ under the License.
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-recipes</artifactId>
 		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 0623862..7128286 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.UUID;
 
@@ -82,28 +81,27 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	private void handleLeaderSessionID(Object message) throws Exception {
 		if(message instanceof LeaderSessionMessage) {
 			LeaderSessionMessage msg = (LeaderSessionMessage) message;
+			UUID expectedID = getLeaderSessionID();
+			UUID actualID = msg.leaderSessionID();
 
-			if(msg.leaderSessionID().isDefined() && getLeaderSessionID().isDefined()) {
-				if(getLeaderSessionID().equals(msg.leaderSessionID())) {
-					// finally call method to handle message
-					handleMessage(msg.message());
-				} else {
-					handleDiscardedMessage(msg);
-				}
+			if(expectedID == actualID || (expectedID != null && expectedID.equals(actualID))) {
+				handleMessage(msg.message());
 			} else {
-				handleDiscardedMessage(msg);
+				handleDiscardedMessage(expectedID, msg);
 			}
 		} else if (message instanceof RequiresLeaderSessionID) {
 			throw new Exception("Received a message " + message + " without a leader session " +
-					"ID, even though it requires to have one.");
+					"ID, even though the message requires a leader session ID.");
 		} else {
 			// call method to handle message
 			handleMessage(message);
 		}
 	}
 
-	private void handleDiscardedMessage(Object msg) {
-		LOG.debug("Discard message {} because the leader session ID was not correct.", msg);
+	private void handleDiscardedMessage(UUID expectedLeaderSessionID, LeaderSessionMessage msg) {
+		LOG.warn("Discard message {} because the expected leader session ID {} did not " +
+				"equal the received leader session ID {}.", msg, expectedLeaderSessionID,
+				msg.leaderSessionID());
 	}
 
 	/**
@@ -118,7 +116,7 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 * Returns the current leader session ID associcated with this actor.
 	 * @return
 	 */
-	protected abstract Option<UUID> getLeaderSessionID();
+	abstract protected UUID getLeaderSessionID();
 
 	/**
 	 * This method should be called for every outgoing message. It wraps messages which require

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java
new file mode 100644
index 0000000..59e3f16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+/**
+ * Defines the listening behaviour of the JobClientActor and thus the messages
+ * which are sent from the JobManger to the JobClientActor.
+ */
+public enum ListeningBehaviour {
+	DETACHED, // only receive the Acknowledge message about the job submission message
+	EXECUTION_RESULT, // receive additionally the SerializedJobExecutionResult
+	EXECUTION_RESULT_AND_STATE_CHANGES // receive additionally the JobStatusChanged messages
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 626d21f..ef2ef61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -258,6 +258,10 @@ public class BlobServer extends Thread implements BlobService {
 					LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
 				}
 			}
+
+			if(LOG.isInfoEnabled()) {
+				LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3681e9e..8f0b19b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -576,7 +575,7 @@ public class CheckpointCoordinator {
 	public ActorGateway createJobStatusListener(
 			ActorSystem actorSystem,
 			long checkpointInterval,
-			Option<UUID> leaderSessionID) {
+			UUID leaderSessionID) {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index f65be15..7e32b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import scala.Option;
 
 import java.util.UUID;
 
@@ -34,16 +33,17 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
 
 	private final CheckpointCoordinator coordinator;
 	private final long interval;
-	private final Option<UUID> leaderSessionID;
+	private final UUID leaderSessionID;
 	
 	public CheckpointCoordinatorDeActivator(
 			CheckpointCoordinator coordinator,
 			long interval,
-			Option<UUID> leaderSessionID) {
-		Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
-		Preconditions.checkNotNull(leaderSessionID, "The leaderSesssionID must not be null.");
+			UUID leaderSessionID) {
+
+		LOG.info("Create CheckpointCoordinatorDeActivator");
+
+		this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
 
-		this.coordinator = coordinator;
 		this.interval = interval;
 		this.leaderSessionID = leaderSessionID;
 	}
@@ -67,7 +67,7 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
 	}
 
 	@Override
-	public Option<UUID> getLeaderSessionID() {
+	public UUID getLeaderSessionID() {
 		return leaderSessionID;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 7507643..9d64866 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobClientMessages;
@@ -120,6 +121,8 @@ public class JobClient {
 	 * @param jobManagerGateway  Gateway to the JobManager that should execute the job.
 	 * @param jobGraph    JobGraph describing the Flink job
 	 * @param timeout     Timeout for futures
+	 * @param sysoutLogUpdates prints log updates to system out if true
+	 * @param userCodeClassloader class loader to be used for deserialization
 	 * @return The job execution result
 	 * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
 	 *                                                               execution fails.
@@ -236,7 +239,10 @@ public class JobClient {
 		Object result;
 		try {
 			Future<Object> future = jobManagerGateway.ask(
-					new JobManagerMessages.SubmitJob(jobGraph, false),
+				new JobManagerMessages.SubmitJob(
+					jobGraph,
+					ListeningBehaviour.DETACHED // only receive the Acknowledge for the job submission message
+				),
 					timeout);
 			
 			result = Await.result(future, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 16c6baf..bf747c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -24,12 +24,12 @@ import akka.actor.Status;
 import akka.actor.Terminated;
 import com.google.common.base.Preconditions;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.slf4j.Logger;
-import scala.Option;
 
 import java.util.UUID;
 
@@ -44,18 +44,18 @@ public class JobClientActor extends FlinkUntypedActor {
 	private final boolean sysoutUpdates;
 
 	/** leader session ID of the JobManager when this actor was created */
-	private final Option<UUID> leaderSessionID;
+	private final UUID leaderSessionID;
 
 	/** Actor which submits a job to the JobManager via this actor */
 	private ActorRef submitter;
 
 	public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates,
-							Option<UUID> leaderSessionID) {
-		
+			UUID leaderSessionID) {
+
 		this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
 		this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
-		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
 
+		this.leaderSessionID = leaderSessionID;
 		this.sysoutUpdates = sysoutUpdates;
 	}
 	
@@ -91,7 +91,11 @@ public class JobClientActor extends FlinkUntypedActor {
 
 					this.submitter = getSender();
 					jobManager.tell(
-							decorateMessage(new JobManagerMessages.SubmitJob(jobGraph, true)), getSelf());
+						decorateMessage(
+							new JobManagerMessages.SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+							getSelf());
 					
 					// make sure we notify the sender when the connection got lost
 					getContext().watch(jobManager);
@@ -102,8 +106,7 @@ public class JobClientActor extends FlinkUntypedActor {
 				String msg = "Received repeated 'SubmitJobAndWait'";
 				logger.error(msg);
 				getSender().tell(
-						decorateMessage(new Status.Failure(new Exception(msg))),
-						ActorRef.noSender());
+					decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
 
 				getContext().unwatch(jobManager);
 				getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
@@ -153,7 +156,7 @@ public class JobClientActor extends FlinkUntypedActor {
 	}
 
 	@Override
-	protected Option<UUID> getLeaderSessionID() {
+	protected UUID getLeaderSessionID() {
 		return leaderSessionID;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 189682b..4cee2f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -556,7 +556,9 @@ public class Execution implements Serializable {
 							partitionId, partitionLocation);
 
 					final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
-							consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
+						consumer.getAttemptId(),
+						partition.getIntermediateResult().getId(),
+						descriptor);
 
 					sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3602372..cde1741 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -51,7 +51,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -326,7 +325,7 @@ public class ExecutionGraph implements Serializable {
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
 			ActorSystem actorSystem,
-			Option<UUID> leaderSessionID) {
+			UUID leaderSessionID) {
 		// simple sanity checks
 		if (interval < 10 || checkpointTimeout < 10) {
 			throw new IllegalArgumentException();

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
index fe4a1cd..a82debb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.instance;
 
 import akka.actor.ActorRef;
-import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.UUID;
 
 /**
@@ -31,7 +31,7 @@ import java.util.UUID;
  *
  * It allows to avoid direct interaction with an ActorRef.
  */
-public interface ActorGateway {
+public interface ActorGateway extends Serializable {
 
 	/**
 	 * Sends a message asynchronously and returns its response. The response to the message is
@@ -99,9 +99,9 @@ public interface ActorGateway {
 	ActorRef actor();
 
 	/**
-	 * Returns the leaderSessionID associated with the remote actor or None.
+	 * Returns the leaderSessionID associated with the remote actor or null.
 	 *
-	 * @return Leader session ID if its associated with this gateway, otherwise None
+	 * @return Leader session ID if its associated with this gateway, otherwise null
 	 */
-	Option<UUID> leaderSessionID();
+	UUID leaderSessionID();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
index ea55458..c00e7fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
@@ -21,31 +21,33 @@ package org.apache.flink.runtime.instance;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import org.apache.flink.runtime.LeaderSessionMessageDecorator;
-import org.apache.flink.runtime.MessageDecorator;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import scala.Option;
+import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator;
+import org.apache.flink.runtime.messages.MessageDecorator;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.UUID;
 
 /**
  * Concrete {@link ActorGateway} implementation which uses Akka to communicate with remote actors.
  */
-public class AkkaActorGateway implements ActorGateway {
+public class AkkaActorGateway implements ActorGateway, Serializable {
+
+	private static final long serialVersionUID = 42l;
 
 	// ActorRef of the remote instance
 	private final ActorRef actor;
 
 	// Associated leader session ID, which is used for RequiresLeaderSessionID messages
-	private final Option<UUID> leaderSessionID;
+	private final UUID leaderSessionID;
 
 	// Decorator for messages
 	private final MessageDecorator decorator;
 
-	public AkkaActorGateway(ActorRef actor, Option<UUID> leaderSessionID) {
+	public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
 		this.actor = actor;
 		this.leaderSessionID = leaderSessionID;
 		// we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
@@ -151,7 +153,7 @@ public class AkkaActorGateway implements ActorGateway {
 	}
 
 	@Override
-	public Option<UUID> leaderSessionID() {
+	public UUID leaderSessionID() {
 		return leaderSessionID;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0a6b4d0..03213e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -30,7 +30,6 @@ import java.util.UUID;
 import akka.actor.ActorRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 /**
  * Simple manager that keeps track of which TaskManager are available and alive.
@@ -143,7 +142,7 @@ public class InstanceManager {
 			InstanceConnectionInfo connectionInfo,
 			HardwareDescription resources,
 			int numberOfSlots,
-			Option<UUID> leaderSessionID){
+			UUID leaderSessionID){
 		synchronized(this.lock){
 			if (this.isShutdown) {
 				throw new IllegalStateException("InstanceManager is shut down.");
@@ -177,8 +176,14 @@ public class InstanceManager {
 			totalNumberOfAliveTaskSlots += numberOfSlots;
 
 			if (LOG.isInfoEnabled()) {
-				LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered hosts is %d.",
-						connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size()));
+				LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " +
+								"Current number of registered hosts is %d. " +
+								"Current number of alive task slots is %d.",
+						connectionInfo.getHostname(),
+						taskManager.path(),
+						id,
+						registeredHostsById.size(),
+						totalNumberOfAliveTaskSlots));
 			}
 
 			host.reportHeartBeat();
@@ -190,13 +195,22 @@ public class InstanceManager {
 		}
 	}
 
-	public void unregisterTaskManager(ActorRef taskManager){
+	/**
+	 * Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
+	 * the given instance as dead and notify {@link InstanceListener} about the dead instance.
+	 *
+	 * @param taskManager TaskManager which is about to be marked dead.
+	 */
+	public void unregisterTaskManager(ActorRef taskManager, boolean terminated){
 		Instance host = registeredHostsByConnection.get(taskManager);
 
 		if(host != null){
 			registeredHostsByConnection.remove(taskManager);
 			registeredHostsById.remove(host.getId());
-			deadHosts.add(taskManager);
+
+			if (terminated) {
+				deadHosts.add(taskManager);
+			}
 
 			host.markDead();
 
@@ -204,12 +218,30 @@ public class InstanceManager {
 
 			notifyDeadInstance(host);
 
-			LOG.info("Unregistered task manager " + taskManager.path().address() + ". Number of " +
+			LOG.info("Unregistered task manager " + taskManager.path() + ". Number of " +
 					"registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
 					" of available slots " + getTotalNumberOfSlots() + ".");
 		}
 	}
 
+	/**
+	 * Unregisters all currently registered TaskManagers from the InstanceManager.
+	 */
+	public void unregisterAllTaskManagers() {
+		for(Instance instance: registeredHostsById.values()) {
+			deadHosts.add(instance.getActorGateway().actor());
+
+			instance.markDead();
+
+			totalNumberOfAliveTaskSlots -= instance.getTotalNumberOfSlots();
+
+			notifyDeadInstance(instance);
+		}
+
+		registeredHostsById.clear();
+		registeredHostsByConnection.clear();
+	}
+
 	public boolean isRegistered(ActorRef taskManager) {
 		return registeredHostsByConnection.containsKey(taskManager);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 539dbc0..9eb4a9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -51,7 +52,6 @@ import java.io.IOException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.apache.flink.runtime.messages.TaskMessages.FailTask;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The network environment contains

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index edfa87b..1202499 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -35,6 +35,8 @@ public class JobManagerCliOptions {
 
 	private String host;
 
+	private int webUIPort = -1;
+
 	// ------------------------------------------------------------------------
 
 	public String getConfigDir() {
@@ -86,4 +88,12 @@ public class JobManagerCliOptions {
 	public void setHost(String host) {
 		this.host = checkNotNull(host);
 	}
+
+	public int getWebUIPort() {
+		return webUIPort;
+	}
+
+	public void setWebUIPort(int webUIPort) {
+		this.webUIPort = webUIPort;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
new file mode 100644
index 0000000..90a1147
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+/**
+ * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ *
+ *   - Standalone: No recovery from JobManager failures
+ *   - ZooKeeper: JobManager high availability via ZooKeeper
+ *     ZooKeeper is used to select a leader among a group of JobManager. This JobManager
+ *     is responsible for the job execution. Upon failure of the leader a new leader is elected
+ *     which will take over the responsibilities of the old leader
+ */
+public enum RecoveryMode {
+	STANDALONE,
+	ZOOKEEPER
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 4383b65..21a1f51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -22,30 +22,41 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URL;
+import java.util.UUID;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.HandlerList;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
  * This class sets up a web-server that contains a web frontend to display information about running jobs.
  * It instantiates and configures an embedded jetty server.
  */
-public class WebInfoServer implements WebMonitor {
+public class WebInfoServer implements WebMonitor, LeaderRetrievalListener {
 
 	/** Web root dir in the jar */
 	private static final String WEB_ROOT_DIR = "web-docs-infoserver";
@@ -56,6 +67,30 @@ public class WebInfoServer implements WebMonitor {
 	/** The jetty server serving all requests. */
 	private final Server server;
 
+	/** Retrieval service for the current leading JobManager */
+	private final LeaderRetrievalService leaderRetrievalService;
+
+	/** ActorSystem used to retrieve the ActorRefs */
+	private final ActorSystem actorSystem;
+
+	/** Collection for the registered jetty handlers */
+	private final HandlerCollection handlers;
+
+	/** Associated configuration */
+	private final Configuration config;
+
+	/** Timeout for the servlets */
+	private final FiniteDuration timeout;
+
+	/** Actor look up timeout */
+	private final FiniteDuration lookupTimeout;
+
+	/** Default jetty handler responsible for serving static content */
+	private final ResourceHandler resourceHandler;
+
+	/** File paths to log dirs */
+	final File[] logDirFiles;
+
 	/** The assigned port where jetty is running. */
 	private int assignedPort = -1;
 
@@ -64,19 +99,23 @@ public class WebInfoServer implements WebMonitor {
 	 * to list all present information concerning the job manager
 	 *
 	 * @param config The Flink configuration.
-	 * @param jobmanager The ActorRef to the JobManager actor
-	 * @param archive The ActorRef to the archive for old jobs
+	 * @param leaderRetrievalService Retrieval service to obtain the current leader
 	 *
 	 * @throws IOException
 	 *         Thrown, if the server setup failed for an I/O related reason.
 	 */
-	public WebInfoServer(Configuration config, ActorGateway jobmanager, ActorGateway archive) throws IOException {
+	public WebInfoServer(
+			Configuration config,
+			LeaderRetrievalService leaderRetrievalService,
+			ActorSystem actorSystem)
+		throws IOException {
 		if (config == null) {
 			throw new IllegalArgumentException("No Configuration has been passed to the web server");
 		}
-		if (jobmanager == null || archive == null) {
-			throw new NullPointerException();
-		}
+
+		this.config = config;
+
+		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
 
 		// if port == 0, jetty will assign an available port.
 		int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
@@ -85,7 +124,10 @@ public class WebInfoServer implements WebMonitor {
 			throw new IllegalArgumentException("Invalid port for the webserver: " + port);
 		}
 
-		final FiniteDuration timeout = AkkaUtils.getTimeout(config);
+		timeout = AkkaUtils.getTimeout(config);
+		lookupTimeout = AkkaUtils.getLookupTimeout(config);
+
+		this.actorSystem = actorSystem;
 
 		// get base path of Flink installation
 		final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
@@ -99,7 +141,7 @@ public class WebInfoServer implements WebMonitor {
 					"resource " + WEB_ROOT_DIR + " is not included in the jar.");
 		}
 
-		final File[] logDirFiles = new File[logDirPaths.length];
+		logDirFiles = new File[logDirPaths.length];
 		int i = 0;
 		for(String path : logDirPaths) {
 			logDirFiles[i++] = new File(path);
@@ -113,24 +155,16 @@ public class WebInfoServer implements WebMonitor {
 
 		server = new Server(port);
 
-		// ----- the handlers for the servlets -----
-		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
-		servletContext.setContextPath("/");
-		servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager, archive, timeout)), "/jobsInfo");
-		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
-		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)), "/setupInfo");
-		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
-
-
 		// ----- the handler serving all the static files -----
-		ResourceHandler resourceHandler = new ResourceHandler();
+		resourceHandler = new ResourceHandler();
 		resourceHandler.setDirectoriesListed(false);
 		resourceHandler.setResourceBase(webRootDir.toExternalForm());
 
 		// ----- add the handlers to the list handler -----
-		HandlerList handlers = new HandlerList();
+
+		// make the HandlerCollection mutable so that we can update it later on
+		handlers = new HandlerCollection(true);
 		handlers.addHandler(resourceHandler);
-		handlers.addHandler(servletContext);
 		server.setHandler(handlers);
 	}
 
@@ -158,12 +192,15 @@ public class WebInfoServer implements WebMonitor {
 		else {
 			LOG.warn("Unable to determine local endpoint of web frontend server");
 		}
+
+		leaderRetrievalService.start(this);
 	}
 
 	/**
 	 * Stop the webserver
 	 */
 	public void stop() throws Exception {
+		leaderRetrievalService.stop();
 		server.stop();
 		assignedPort = -1;
 	}
@@ -171,4 +208,86 @@ public class WebInfoServer implements WebMonitor {
 	public int getServerPort() {
 		return this.assignedPort;
 	}
+
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+
+		if(leaderAddress != null && !leaderAddress.equals("")) {
+			try {
+				ActorRef jobManager = AkkaUtils.getActorRef(
+					leaderAddress,
+					actorSystem,
+					lookupTimeout);
+				ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+				Future<Object> archiveFuture = jobManagerGateway.ask(
+					JobManagerMessages.getRequestArchive(),
+					timeout);
+
+				ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
+					archiveFuture,
+					timeout)).actor();
+
+				ActorGateway archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
+
+				updateHandler(jobManagerGateway, archiveGateway);
+			} catch (Exception e) {
+				handleError(e);
+			}
+		}
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		LOG.error("Received error from LeaderRetrievalService.", exception);
+
+		try{
+			// stop the whole web server
+			stop();
+		} catch (Exception e) {
+			LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
+		}
+	}
+
+	/**
+	 * Updates the Flink handlers with the current leading JobManager and archive
+	 *
+	 * @param jobManager ActorGateway to the current JobManager leader
+	 * @param archive ActorGateway to the current archive of the leading JobManager
+	 * @throws Exception
+	 */
+	private void updateHandler(ActorGateway jobManager, ActorGateway archive) throws Exception {
+		// ----- the handlers for the servlets -----
+		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
+		servletContext.setContextPath("/");
+		servletContext.addServlet(
+				new ServletHolder(
+						new JobManagerInfoServlet(
+								jobManager,
+								archive,
+								timeout)),
+				"/jobsInfo");
+		servletContext.addServlet(
+				new ServletHolder(
+						new LogfileInfoServlet(
+								logDirFiles)),
+				"/logInfo");
+		servletContext.addServlet(
+				new ServletHolder(
+						new SetupInfoServlet(
+								config,
+								jobManager,
+								timeout)),
+				"/setupInfo");
+		servletContext.addServlet(
+				new ServletHolder(
+						new MenuServlet()),
+				"/menu");
+
+		// replace old handlers with new ones
+		handlers.setHandlers(new Handler[]{resourceHandler, servletContext});
+
+		// start new handler
+		servletContext.start();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java
new file mode 100644
index 0000000..dcf0a4e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+/**
+ * Interface which has to be implemented to take part in the leader election process of the
+ * {@link LeaderElectionService}.
+ */
+public interface LeaderContender {
+
+	/**
+	 * Callback method which is called by the {@link LeaderElectionService} upon selecting this
+	 * instance as the new leader. The method is called with the new leader session ID.
+	 *
+	 * @param leaderSessionID New leader session ID
+	 */
+	void grantLeadership(UUID leaderSessionID);
+
+	/**
+	 * Callback method which is called by the {@link LeaderElectionService} upon revoking the
+	 * leadership of a former leader. This might happen in case that multiple contenders have
+	 * been granted leadership.
+	 */
+	void revokeLeadership();
+
+	/**
+	 * Returns the address of the {@link LeaderContender} under which other instances can connect
+	 * to it.
+	 *
+	 * @return Address of this contender.
+	 */
+	String getAddress();
+
+	/**
+	 * Callback method which is called by {@link LeaderElectionService} in case of an error in the
+	 * service thread.
+	 *
+	 * @param exception Caught exception
+	 */
+	void handleError(Exception exception);
+}