You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/22 11:14:25 UTC

[2/2] flink git commit: [FLINK-6134] Set UUID(0L, 0L) as default leader session id

[FLINK-6134] Set UUID(0L, 0L) as default leader session id

Before the default leader session id was null in the standalone case. However, in the ZooKeeper
case null indicated that there was no active leader available. With this commit, the default
leader id will be set to UUID(0L, 0L). This allows the uniform treatment of null denoting that
there is no active leader across the standalone and the ZooKeeper case.

With this change, the FlinkActors will now ignore all LeaderSessionMessages if the actors's
leader id field is null. This indicates that the FlinkActor does not know the current leader.

This closes #3578.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dfd463e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dfd463e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dfd463e

Branch: refs/heads/master
Commit: 2dfd463e2a3ca2cc1428753b51a980f181a468a9
Parents: 034d9a3
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Mar 20 18:15:57 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Mar 22 10:26:40 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/ClientTest.java | 37 ++++----
 .../MesosFlinkResourceManagerTest.java          |  3 +-
 .../BackPressureStatsTrackerITCase.java         |  3 +-
 .../StackTraceSampleCoordinatorITCase.java      |  3 +-
 .../flink/runtime/akka/FlinkUntypedActor.java   | 14 ++-
 .../flink/runtime/client/JobClientActor.java    | 98 ++++++++++++++------
 .../clusterframework/FlinkResourceManager.java  |  4 +-
 .../runtime/instance/AkkaActorGateway.java      |  5 +-
 .../StandaloneLeaderElectionService.java        |  3 +-
 .../StandaloneLeaderRetrievalService.java       |  4 +-
 .../runtime/LeaderSessionMessageFilter.scala    | 21 +++--
 .../runtime/messages/JobClientMessages.scala    |  9 +-
 .../runtime/messages/JobManagerMessages.scala   |  2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  5 +-
 .../runtime/client/JobClientActorTest.java      | 29 +++---
 .../jobmanager/JobManagerHARecoveryTest.java    |  5 +-
 .../runtime/jobmanager/JobManagerTest.java      | 41 ++++----
 .../flink/runtime/jobmaster/JobMasterTest.java  |  4 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  8 +-
 .../StandaloneLeaderElectionTest.java           |  5 +-
 .../TestingLeaderRetrievalService.java          |  6 +-
 .../runtime/metrics/TaskManagerMetricsTest.java |  7 +-
 .../AkkaKvStateLocationLookupServiceTest.java   | 24 +++--
 .../resourcemanager/JobLeaderIdServiceTest.java | 15 ++-
 .../ResourceManagerJobMasterTest.java           | 13 ++-
 .../taskexecutor/TaskExecutorITCase.java        |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 24 +++--
 .../TaskManagerRegistrationTest.java            |  3 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  7 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala | 24 +++--
 .../testingUtils/ScalaTestingUtils.scala        |  3 +-
 .../runtime/testingUtils/TestingUtils.scala     | 15 +--
 .../JobManagerHAJobGraphRecoveryITCase.java     | 88 ------------------
 .../LocalFlinkMiniClusterITCase.java            |  3 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   |  4 +-
 36 files changed, 292 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 96785f4..75cb0e7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -42,11 +42,13 @@ import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.NetUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -54,6 +56,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
@@ -68,7 +71,7 @@ import static org.mockito.Mockito.when;
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
-public class ClientTest {
+public class ClientTest extends TestLogger {
 
 	private PackagedProgram program;
 
@@ -217,27 +220,21 @@ public class ClientTest {
 	 * This test verifies correct that the correct exception is thrown when the job submission fails.
 	 */
 	@Test
-	public void shouldSubmitToJobClientFails() {
-		try {
-			jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
+	public void shouldSubmitToJobClientFails() throws IOException {
+		jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME());
 
-			ClusterClient out = new StandaloneClusterClient(config);
-			out.setDetached(true);
+		ClusterClient out = new StandaloneClusterClient(config);
+		out.setDetached(true);
 
-			try {
-				out.run(program.getPlanWithJars(), 1);
-				fail("This should fail with an exception");
-			}
-			catch (ProgramInvocationException e) {
-				// bam!
-			}
-			catch (Exception e) {
-				fail("wrong exception " + e);
-			}
+		try {
+			out.run(program.getPlanWithJars(), 1);
+			fail("This should fail with an exception");
+		}
+		catch (ProgramInvocationException e) {
+			// bam!
 		}
 		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
+			fail("wrong exception " + e);
 		}
 	}
 
@@ -308,7 +305,7 @@ public class ClientTest {
 
 	public static class SuccessReturningActor extends FlinkUntypedActor {
 
-		private UUID leaderSessionID = null;
+		private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
 
 		@Override
 		public void handleMessage(Object message) {
@@ -338,7 +335,7 @@ public class ClientTest {
 
 	public static class FailureReturningActor extends FlinkUntypedActor {
 
-		private UUID leaderSessionID = null;
+		private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
 
 		@Override
 		public void handleMessage(Object message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index dcf6a82..c854a17 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameter
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.messages.*;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -208,7 +209,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 					TestingMesosFlinkResourceManager.class,
 					config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
 			resourceManagerInstance = resourceManagerRef.underlyingActor();
-			resourceManager = new AkkaActorGateway(resourceManagerRef, null);
+			resourceManager = new AkkaActorGateway(resourceManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			verify(schedulerDriver).start();
 			resourceManagerInstance.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 46f8be6..e80c509 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -138,7 +139,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 					@Override
 					protected void run() {
 						try {
-							ActorGateway testActor = new AkkaActorGateway(getTestActor(), null);
+							ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 							// Submit the job and wait until it is running
 							JobClient.submitJobDetached(

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index a44e212..5463384 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -110,7 +111,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
 					@Override
 					protected void run() {
 						try {
-							ActorGateway testActor = new AkkaActorGateway(getTestActor(), null);
+							ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 							int maxAttempts = 10;
 							int sleepTime = 100;

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 3255778..05ae501 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -84,10 +84,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 			UUID expectedID = getLeaderSessionID();
 			UUID actualID = msg.leaderSessionID();
 
-			if(expectedID == actualID || (expectedID != null && expectedID.equals(actualID))) {
-				handleMessage(msg.message());
+			if (expectedID != null) {
+				if (expectedID.equals(actualID)) {
+					handleMessage(msg.message());
+				} else {
+					handleDiscardedMessage(expectedID, msg);
+				}
 			} else {
-				handleDiscardedMessage(expectedID, msg);
+				handleNoLeaderId(msg);
 			}
 		} else if (message instanceof RequiresLeaderSessionID) {
 			throw new Exception("Received a message " + message + " without a leader session " +
@@ -104,6 +108,10 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 				msg.leaderSessionID());
 	}
 
+	private void handleNoLeaderId(LeaderSessionMessage msg) {
+		LOG.warn("Discard message {} because there is currently no valid leader id known.", msg);
+	}
+
 	/**
 	 * This method contains the actor logic which defines how to react to incoming messages.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 1380e76..368a2b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.client;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
 import akka.actor.PoisonPill;
 import akka.actor.Status;
 import akka.actor.Terminated;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Objects;
 import java.util.UUID;
 
 
@@ -66,6 +68,10 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 	/** The client which the actor is responsible for */
 	protected ActorRef client;
 
+	private Cancellable connectionTimeout;
+
+	private UUID connectionTimeoutId;
+
 	public JobClientActor(
 			LeaderRetrievalService leaderRetrievalService,
 			FiniteDuration timeout,
@@ -73,6 +79,11 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.sysoutUpdates = sysoutUpdates;
+		this.jobManager = ActorRef.noSender();
+		this.leaderSessionID = null;
+
+		connectionTimeout = null;
+		connectionTimeoutId = null;
 	}
 
 	@Override
@@ -146,6 +157,9 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 							getSelf().tell(decorateMessage(new JobManagerActorRef(result)), ActorRef.noSender());
 						}
 					}, getContext().dispatcher());
+			} else if (isClientConnected() && connectionTimeoutId == null) {
+				// msg.address == null means that the leader has lost its leadership
+				registerConnectionTimeout();
 			}
 		} else if (message instanceof JobManagerActorRef) {
 			// Resolved JobManager ActorRef
@@ -184,36 +198,35 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 					jobManager.path());
 				disconnectFromJobManager();
 
-				// we only issue a connection timeout if we have submitted a job before
-				// otherwise, we might have some more time to find another job manager
-				// Important: The ConnectionTimeout message is filtered out in case that we are
-				// notified about a new leader by setting the new leader session ID, because
-				// ConnectionTimeout extends RequiresLeaderSessionID
 				if (isClientConnected()) {
-					getContext().system().scheduler().scheduleOnce(
-						timeout,
-						getSelf(),
-						decorateMessage(JobClientMessages.getConnectionTimeout()),
-						getContext().dispatcher(),
-						ActorRef.noSender());
+					if (connectionTimeoutId == null) {
+						// only register a connection timeout if we haven't done this before
+						registerConnectionTimeout();
+					}
 				}
 			} else {
 				LOG.warn("Received 'Terminated' for unknown actor " + target);
 			}
 		}
-		else if (JobClientMessages.getConnectionTimeout().equals(message)) {
-			// check if we haven't found a job manager yet
-			if (!isJobManagerConnected()) {
-				final JobClientActorConnectionTimeoutException errorMessage =
-					new JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
-				final Object replyMessage = decorateMessage(new Status.Failure(errorMessage));
-				if (isClientConnected()) {
-					client.tell(
-						replyMessage,
-						getSelf());
+		else if (message instanceof JobClientMessages.ConnectionTimeout) {
+			JobClientMessages.ConnectionTimeout timeoutMessage = (JobClientMessages.ConnectionTimeout) message;
+
+			if (Objects.equals(connectionTimeoutId, timeoutMessage.id())) {
+				// check if we haven't found a job manager yet
+				if (!isJobManagerConnected()) {
+					final JobClientActorConnectionTimeoutException errorMessage =
+						new JobClientActorConnectionTimeoutException("Lost connection to the JobManager.");
+					final Object replyMessage = decorateMessage(new Status.Failure(errorMessage));
+					if (isClientConnected()) {
+						client.tell(
+							replyMessage,
+							getSelf());
+					}
+					// Connection timeout reached, let's terminate
+					terminate();
 				}
-				// Connection timeout reached, let's terminate
-				terminate();
+			} else {
+				LOG.debug("Received outdated connection timeout.");
 			}
 		}
 
@@ -225,13 +238,10 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 				message);
 			// We want to submit/attach to a job, but we haven't found a job manager yet.
 			// Let's give him another chance to find a job manager within the given timeout.
-			getContext().system().scheduler().scheduleOnce(
-				timeout,
-				getSelf(),
-				decorateMessage(JobClientMessages.getConnectionTimeout()),
-				getContext().dispatcher(),
-				ActorRef.noSender()
-			);
+			if (connectionTimeoutId == null) {
+				// only register the connection timeout once
+				registerConnectionTimeout();
+			}
 			handleCustomMessage(message);
 		}
 		else {
@@ -304,6 +314,8 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 			getContext().unwatch(jobManager);
 			jobManager = ActorRef.noSender();
 		}
+
+		leaderSessionID = null;
 	}
 
 	private void connectToJobManager(ActorRef jobManager) {
@@ -316,6 +328,8 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 
 		this.jobManager = jobManager;
 		getContext().watch(jobManager);
+
+		unregisterConnectionTimeout();
 	}
 
 	protected void terminate() {
@@ -333,4 +347,28 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
 		return client != ActorRef.noSender();
 	}
 
+	private void registerConnectionTimeout() {
+		if (connectionTimeout != null) {
+			connectionTimeout.cancel();
+		}
+
+		connectionTimeoutId = UUID.randomUUID();
+
+		connectionTimeout = getContext().system().scheduler().scheduleOnce(
+			timeout,
+			getSelf(),
+			decorateMessage(new JobClientMessages.ConnectionTimeout(connectionTimeoutId)),
+			getContext().dispatcher(),
+			ActorRef.noSender()
+		);
+	}
+
+	private void unregisterConnectionTimeout() {
+		if (connectionTimeout != null) {
+			connectionTimeout.cancel();
+			connectionTimeout = null;
+			connectionTimeoutId = null;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 911c1f6..77dbad4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -410,9 +410,9 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 		// disconnect from the current leader (no-op if no leader yet)
 		jobManagerLostLeadership();
 
-		// a null leader address means that only a leader disconnect
+		// a null leader session id means that only a leader disconnect
 		// happened, without a new leader yet
-		if (leaderAddress != null) {
+		if (leaderSessionID != null && leaderAddress != null) {
 			// the leaderSessionID implicitly filters out success and failure messages
 			// that come after leadership changed again
 			this.leaderSessionID = leaderSessionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
index adeae03..26b6176 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
@@ -24,6 +24,7 @@ import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator;
 import org.apache.flink.runtime.messages.MessageDecorator;
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -48,8 +49,8 @@ public class AkkaActorGateway implements ActorGateway, Serializable {
 	private final MessageDecorator decorator;
 
 	public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
-		this.actor = actor;
-		this.leaderSessionID = leaderSessionID;
+		this.actor = Preconditions.checkNotNull(actor);
+		this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID);
 		// we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
 		this.decorator = new LeaderSessionMessageDecorator(leaderSessionID);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index 2d36616..a956a5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
@@ -42,7 +43,7 @@ public class StandaloneLeaderElectionService implements LeaderElectionService {
 		contender = Preconditions.checkNotNull(newContender);
 
 		// directly grant leadership to the given contender
-		contender.grantLeadership(null);
+		contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 4ad4646..174e106 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.leaderretrieval;
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -54,7 +56,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService
 	@Deprecated
 	public StandaloneLeaderRetrievalService(String leaderAddress) {
 		this.leaderAddress = checkNotNull(leaderAddress);
-		this.leaderId = null;
+		this.leaderId = HighAvailabilityServices.DEFAULT_LEADER_ID;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
index 1fb32ce..1a6be25 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala
@@ -32,10 +32,15 @@ trait LeaderSessionMessageFilter extends FlinkActor {
 
   abstract override def receive: Receive = {
     case leaderMessage @ LeaderSessionMessage(msgID, msg) =>
-      if (leaderSessionID.equals(Option(msgID))) {
-        super.receive(msg)
-      } else {
-        handleDiscardedMessage(leaderSessionID, leaderMessage)
+      leaderSessionID match {
+        case Some(leaderId) =>
+          if (leaderId.equals(msgID)) {
+            super.receive(msg)
+          } else {
+            handleDiscardedMessage(leaderId, leaderMessage)
+          }
+        case None =>
+          handleNoLeaderId(leaderMessage)
       }
     case msg: RequiresLeaderSessionID =>
       throw new Exception(s"Received a message $msg without a leader session ID, even though" +
@@ -45,12 +50,16 @@ trait LeaderSessionMessageFilter extends FlinkActor {
   }
 
   private def handleDiscardedMessage(
-      expectedLeaderSessionID: Option[UUID],
+      expectedLeaderSessionID: UUID,
       msg: LeaderSessionMessage)
     : Unit = {
     log.warn(s"Discard message $msg because the expected leader session ID " +
       s"$expectedLeaderSessionID did not equal the received leader session ID " +
-      s"${Option(msg.leaderSessionID)}.")
+      s"${msg.leaderSessionID}.")
+  }
+
+  private def handleNoLeaderId(msg: LeaderSessionMessage): Unit = {
+    log.warn(s"Discard message $msg because there is currently no valid leader id known.")
   }
 
   /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
index 1f29e32..7268b0f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala
@@ -66,10 +66,13 @@ object JobClientMessages {
   /** Message which is triggered when the JobClient registration at the JobManager times out */
   case object RegistrationTimeout extends RequiresLeaderSessionID
 
-  /** Message which is triggered when the connection timeout has been reached. */
-  case object ConnectionTimeout extends RequiresLeaderSessionID
+  /**
+    * Message which is triggered when the connection timeout has been reached.
+    *
+    * @param id Timeout id which identifies the concurrent timeouts
+    */
+  case class ConnectionTimeout(id: UUID)
 
   def getSubmissionTimeout(): AnyRef = SubmissionTimeout
   def getRegistrationTimeout(): AnyRef = RegistrationTimeout
-  def getConnectionTimeout(): AnyRef = ConnectionTimeout
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index f3025ab..4db2584 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -45,7 +45,7 @@ object JobManagerMessages {
     * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],
     * which also contains the current leader session ID.
     *
-    * @param leaderSessionID Current leader session ID or null, if no leader session ID was set
+    * @param leaderSessionID Current leader session ID
     * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]
     */
   case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 25d5366..2e8a6fa 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1434,7 +1434,10 @@ class TaskManager(
     this.jobManagerAkkaURL = Option(newJobManagerAkkaURL)
     this.leaderSessionID = Option(leaderSessionID)
 
-    triggerTaskManagerRegistration()
+    if (this.leaderSessionID.isDefined) {
+      // only trigger the registration if we have obtained a valid leader id (!= null)
+      triggerTaskManagerRegistration()
+    }
   }
 
   /** Starts the TaskManager's registration process to connect to the JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 2e3384f..0ec00df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -28,6 +28,7 @@ import akka.util.Timeout;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -73,7 +74,7 @@ public class JobClientActorTest extends TestLogger {
 	 */
 	@Test(expected=JobClientActorSubmissionTimeoutException.class)
 	public void testSubmissionTimeout() throws Exception {
-		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
 		FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
 		UUID leaderSessionID = UUID.randomUUID();
@@ -112,7 +113,7 @@ public class JobClientActorTest extends TestLogger {
 	 */
 	@Test(expected=JobClientActorRegistrationTimeoutException.class)
 	public void testRegistrationTimeout() throws Exception {
-		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
 		FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
 		UUID leaderSessionID = UUID.randomUUID();
@@ -142,17 +143,19 @@ public class JobClientActorTest extends TestLogger {
 		Await.result(jobExecutionResult, timeout);
 	}
 
-	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+	/** Tests that a {@link JobClientActorConnectionTimeoutException}
 	 * is thrown when the JobSubmissionClientActor wants to submit a job but has not connected to a JobManager.
 	 *
 	 * @throws Exception
 	 */
 	@Test(expected=JobClientActorConnectionTimeoutException.class)
 	public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception {
-		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
 		FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
-		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
 			testingLeaderRetrievalService,
@@ -170,16 +173,18 @@ public class JobClientActorTest extends TestLogger {
 		Await.result(jobExecutionResult, timeout);
 	}
 
-	/** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+	/** Tests that a {@link JobClientActorConnectionTimeoutException}
 	 * is thrown when the JobAttachmentClientActor attach to a job at the JobManager
 	 * but has not connected to a JobManager.
 	 */
 	@Test(expected=JobClientActorConnectionTimeoutException.class)
 	public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception {
-		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
 		FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
-		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 		Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
 			testingLeaderRetrievalService,
@@ -203,7 +208,7 @@ public class JobClientActorTest extends TestLogger {
 	 */
 	@Test(expected=JobClientActorConnectionTimeoutException.class)
 	public void testConnectionTimeoutAfterJobSubmission() throws Exception {
-		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
 		FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
 		UUID leaderSessionID = UUID.randomUUID();
@@ -245,8 +250,8 @@ public class JobClientActorTest extends TestLogger {
 	 */
 	@Test(expected=JobClientActorConnectionTimeoutException.class)
 	public void testConnectionTimeoutAfterJobRegistration() throws Exception {
-		FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS);
-		FiniteDuration timeout = jobClientActorTimeout.$times(2);
+		FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
+		FiniteDuration timeout = jobClientActorTimeout.$times(2L);
 
 		UUID leaderSessionID = UUID.randomUUID();
 
@@ -287,7 +292,7 @@ public class JobClientActorTest extends TestLogger {
 	 */
 	@Test
 	public void testGuaranteedAnswerIfJobClientDies() throws Exception {
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
+		FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.SECONDS);
 
 			UUID leaderSessionID = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 32358c0..2ebc36e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceManager;
@@ -166,7 +167,9 @@ public class JobManagerHARecoveryTest {
 			CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
 			CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter);
 			TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
-			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
+			TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(
+				"localhost",
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			InstanceManager instanceManager = new InstanceManager();
 			instanceManager.addInstanceListener(scheduler);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 727fc65..3944752c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -86,6 +87,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -120,7 +122,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class JobManagerTest {
+public class JobManagerTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -183,7 +185,7 @@ public class JobManagerTest {
 							TestingUtils.TESTING_DURATION());
 
 						// we can set the leader session ID to None because we don't use this gateway to send messages
-						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 						// Submit the job and wait for all vertices to be running
 						jobManagerGateway.tell(
@@ -304,7 +306,7 @@ public class JobManagerTest {
 							TestingUtils.TESTING_DURATION());
 
 						// we can set the leader session ID to None because we don't use this gateway to send messages
-						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 						// Submit the job and wait for all vertices to be running
 						jobManagerGateway.tell(
@@ -395,7 +397,7 @@ public class JobManagerTest {
 							TestingUtils.TESTING_DURATION());
 
 						// we can set the leader session ID to None because we don't use this gateway to send messages
-						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 						// Submit the job and wait for all vertices to be running
 						jobManagerGateway.tell(
@@ -478,7 +480,7 @@ public class JobManagerTest {
 						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 						// we can set the leader session ID to None because we don't use this gateway to send messages
-						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 						// Submit the job and wait for all vertices to be running
 						jobManagerGateway.tell(
@@ -530,7 +532,7 @@ public class JobManagerTest {
 						final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 						// we can set the leader session ID to None because we don't use this gateway to send messages
-						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
+						final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 						// Submit the job and wait for all vertices to be running
 						jobManagerGateway.tell(
@@ -572,7 +574,6 @@ public class JobManagerTest {
 		Configuration config = new Configuration();
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms");
 
-		UUID leaderSessionId = null;
 		ActorGateway jobManager = new AkkaActorGateway(
 				JobManager.startJobManagerActors(
 					config,
@@ -581,7 +582,7 @@ public class JobManagerTest {
 					TestingUtils.defaultExecutor(),
 					TestingJobManager.class,
 					MemoryArchivist.class)._1(),
-				leaderSessionId);
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 		LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(
 				AkkaUtils.getAkkaURL(system, jobManager.actor()));
@@ -794,8 +795,8 @@ public class JobManagerTest {
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), null);
-			archiver = new AkkaActorGateway(master._2(), null);
+			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
 					config,
@@ -807,7 +808,7 @@ public class JobManagerTest {
 					true,
 					TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, null);
+			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -920,8 +921,8 @@ public class JobManagerTest {
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), null);
-			archiver = new AkkaActorGateway(master._2(), null);
+			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
 				config,
@@ -933,7 +934,7 @@ public class JobManagerTest {
 				true,
 				TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, null);
+			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1026,8 +1027,8 @@ public class JobManagerTest {
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), null);
-			archiver = new AkkaActorGateway(master._2(), null);
+			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
 					config,
@@ -1039,7 +1040,7 @@ public class JobManagerTest {
 					true,
 					TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, null);
+			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
@@ -1126,8 +1127,8 @@ public class JobManagerTest {
 				TestingJobManager.class,
 				TestingMemoryArchivist.class);
 
-			jobManager = new AkkaActorGateway(master._1(), null);
-			archiver = new AkkaActorGateway(master._2(), null);
+			jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+			archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			Configuration tmConfig = new Configuration();
 			tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
@@ -1142,7 +1143,7 @@ public class JobManagerTest {
 					true,
 					TestingTaskManager.class);
 
-			taskManager = new AkkaActorGateway(taskManagerRef, null);
+			taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			// Wait until connected
 			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 567a8fc..43536b6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -64,7 +64,9 @@ public class JobMasterTest extends TestLogger {
 	@Test
 	public void testHeartbeatTimeoutWithTaskManager() throws Exception {
 		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 		haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index c143fe2..1cab0ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -53,8 +53,8 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 		this.userConfiguration = userConfiguration;
 		this.useSingleActorSystem = singleActorSystem;
 
-		leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
-		leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
+		leaderElectionServices = new ArrayList<>();
+		leaderRetrievalServices = new ArrayList<>();
 	}
 
 	@Override
@@ -78,7 +78,9 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 
 	@Override
 	public LeaderRetrievalService createLeaderRetrievalService() {
-		leaderRetrievalServices.add(new TestingLeaderRetrievalService());
+		leaderRetrievalServices.add(new TestingLeaderRetrievalService(
+			null,
+			null));
 
 		return leaderRetrievalServices.get(leaderRetrievalServices.size() - 1);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
index b04be63..18b620f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -46,12 +47,12 @@ public class StandaloneLeaderElectionTest extends TestLogger {
 			contender.waitForLeader(1000l);
 
 			assertTrue(contender.isLeader());
-			assertEquals(null, contender.getLeaderSessionID());
+			assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, contender.getLeaderSessionID());
 
 			testingListener.waitForNewLeader(1000l);
 
 			assertEquals(TEST_URL, testingListener.getAddress());
-			assertEquals(null, testingListener.getLeaderSessionID());
+			assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, testingListener.getLeaderSessionID());
 		} finally {
 			leaderElectionService.stop();
 			leaderRetrievalService.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index d6bcaaf..887772a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -35,10 +35,6 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 
 	private volatile LeaderRetrievalListener listener;
 
-	public TestingLeaderRetrievalService() {
-		this(null, null);
-	}
-
 	public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) {
 		this.leaderAddress = leaderAddress;
 		this.leaderSessionID = leaderSessionID;
@@ -48,7 +44,7 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 	public void start(LeaderRetrievalListener listener) throws Exception {
 		this.listener = Preconditions.checkNotNull(listener);
 
-		if (leaderAddress != null) {
+		if (leaderSessionID != null && leaderAddress != null) {
 			listener.notifyLeaderAddress(leaderAddress, leaderSessionID);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index aed2b6f..100c83d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -25,6 +25,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -109,7 +110,11 @@ public class TaskManagerMetricsTest {
 						expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage());
 
 						// trigger re-registration of TM; this should include a disconnect from the current JM
-						taskManager.tell(new TaskManagerMessages.JobManagerLeaderAddress(jobManager.path().toString(), null), jobManager);
+						taskManager.tell(
+							new TaskManagerMessages.JobManagerLeaderAddress(
+								jobManager.path().toString(),
+								HighAvailabilityServices.DEFAULT_LEADER_ID),
+							jobManager);
 
 						// wait for re-registration to be completed
 						taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
index e9950fb..34e3174 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java
@@ -26,12 +26,14 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy;
 import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -51,7 +53,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class AkkaKvStateLocationLookupServiceTest {
+public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
 
 	/** The default timeout. */
 	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
@@ -77,7 +79,9 @@ public class AkkaKvStateLocationLookupServiceTest {
 	 */
 	@Test
 	public void testNoJobManagerRegistered() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
 
 		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
@@ -108,7 +112,7 @@ public class AkkaKvStateLocationLookupServiceTest {
 		//
 		// Leader registration => communicate with new leader
 		//
-		UUID leaderSessionId = null;
+		UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
 		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
 
 		ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
@@ -154,7 +158,9 @@ public class AkkaKvStateLocationLookupServiceTest {
 	 */
 	@Test
 	public void testLeaderSessionIdChange() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
 
 		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
@@ -216,7 +222,9 @@ public class AkkaKvStateLocationLookupServiceTest {
 					}
 				};
 
-		final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 
 		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
 				leaderRetrievalService,
@@ -268,7 +276,7 @@ public class AkkaKvStateLocationLookupServiceTest {
 
 			@Override
 			public boolean tryRetry() {
-				leaderRetrievalService.notifyListener(testActorAddress, null);
+				leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
 				return true;
 			}
 		});
@@ -279,7 +287,9 @@ public class AkkaKvStateLocationLookupServiceTest {
 
 	@Test
 	public void testUnexpectedResponseType() throws Exception {
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
 
 		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index d5e99bd..149cc10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.util.TestLogger;
@@ -64,7 +65,9 @@ public class JobLeaderIdServiceTest extends TestLogger {
 		final String address = "foobar";
 		final UUID leaderId = UUID.randomUUID();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
 
@@ -98,7 +101,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 	public void testRemovingJob() throws Exception {
 		final JobID jobId = new JobID();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(null, null);
 
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
 
@@ -139,7 +142,9 @@ public class JobLeaderIdServiceTest extends TestLogger {
 	public void testInitialJobTimeout() throws Exception {
 		final JobID jobId = new JobID();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
 
@@ -181,7 +186,9 @@ public class JobLeaderIdServiceTest extends TestLogger {
 		final String address = "foobar";
 		final UUID leaderId = UUID.randomUUID();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 
 		highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index fb166d4..9a68eca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -112,7 +113,9 @@ public class ResourceManagerJobMasterTest {
 		String jobMasterAddress = "/jobMasterAddress1";
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
@@ -136,7 +139,9 @@ public class ResourceManagerJobMasterTest {
 		String jobMasterAddress = "/jobMasterAddress1";
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
@@ -160,7 +165,9 @@ public class ResourceManagerJobMasterTest {
 		String jobMasterAddress = "/jobMasterAddress1";
 		JobID jobID = mockJobMaster(jobMasterAddress);
 		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index e74ba29..f6c2dce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -83,7 +83,7 @@ public class TaskExecutorITCase {
 		final ResourceID taskManagerResourceId = new ResourceID("foobar");
 		final UUID rmLeaderId = UUID.randomUUID();
 		final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
-		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
 		final String rmAddress = "rm";
 		final String jmAddress = "jm";
 		final UUID jmLeaderId = UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 5702eeb..67196aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -119,8 +119,12 @@ public class TaskExecutorTest extends TestLogger {
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
+		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
 		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
@@ -296,7 +300,9 @@ public class TaskExecutorTest extends TestLogger {
 			rpc.registerGateway(address1, rmGateway1);
 			rpc.registerGateway(address2, rmGateway2);
 
-			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+				null,
+				null);
 
 			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
 			haServices.setResourceManagerLeaderRetriever(testLeaderService);
@@ -514,8 +520,12 @@ public class TaskExecutorTest extends TestLogger {
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
-		final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
+		final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
 		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
 		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
 
@@ -735,7 +745,9 @@ public class TaskExecutorTest extends TestLogger {
 			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
 			rpc.registerGateway(address1, rmGateway1);
 
-			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+				"localhost",
+				HighAvailabilityServices.DEFAULT_LEADER_ID);
 
 			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
 			haServices.setResourceManagerLeaderRetriever(testLeaderService);

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index e234cba..f3b1d4a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -567,7 +568,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				final ActorRef taskManager = taskManagerGateway.actor();
 
 				final UUID falseLeaderSessionID = UUID.randomUUID();
-				final UUID trueLeaderSessionID = null;
+				final UUID trueLeaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
 
 				new Within(timeout) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 730595c..a754cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -125,7 +126,7 @@ public class TaskManagerTest extends TestLogger {
 
 	private static ActorSystem system;
 
-	final static UUID leaderSessionID = null;
+	final static UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
 
 	@BeforeClass
 	public static void setup() {
@@ -1165,8 +1166,8 @@ public class TaskManagerTest extends TestLogger {
 
 			// We need this to be a JM that answers to update messages for
 			// robustness on Travis (if jobs need to be resubmitted in (4)).
-			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(null)));
-			ActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, null);
+			ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(leaderSessionID)));
+			ActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, leaderSessionID);
 
 			final ActorGateway testActorGateway = new AkkaActorGateway(
 					getTestActor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index dfcbf77..76d9591 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -27,12 +27,12 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance._
 import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
 import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager}
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
-
 import org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.junit.Assert.{assertNotEquals, assertNotNull}
@@ -87,7 +87,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
             connectionInfo1,
             hardwareDescription,
             1),
-          new AkkaActorGateway(tm1, null))
+          new AkkaActorGateway(tm1, HighAvailabilityServices.DEFAULT_LEADER_ID))
 
         val response = expectMsgType[LeaderSessionMessage]
         response match {
@@ -104,7 +104,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
             connectionInfo2,
             hardwareDescription,
             1),
-          new AkkaActorGateway(tm2, null))
+          new AkkaActorGateway(tm2, HighAvailabilityServices.DEFAULT_LEADER_ID))
 
         val response = expectMsgType[LeaderSessionMessage]
         response match {
@@ -123,7 +123,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       val jm = startTestingJobManager(_system)
       val rm = startTestingResourceManager(_system, jm.actor())
 
-      val selfGateway = new AkkaActorGateway(testActor, null)
+      val selfGateway = new AkkaActorGateway(testActor, HighAvailabilityServices.DEFAULT_LEADER_ID)
 
       val resourceID = ResourceID.generate()
       val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1)
@@ -155,17 +155,23 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
           selfGateway)
 
         expectMsgType[LeaderSessionMessage] match {
-          case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) =>
+          case LeaderSessionMessage(
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AcknowledgeRegistration(_, _)) =>
           case m => fail("Wrong message type: " + m)
         }
 
         expectMsgType[LeaderSessionMessage] match {
-          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case LeaderSessionMessage(
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
           case m => fail("Wrong message type: " + m)
         }
 
         expectMsgType[LeaderSessionMessage] match {
-          case LeaderSessionMessage(null, AlreadyRegistered(_, _)) =>
+          case LeaderSessionMessage(
+            HighAvailabilityServices.DEFAULT_LEADER_ID,
+            AlreadyRegistered(_, _)) =>
           case m => fail("Wrong message type: " + m)
         }
       }
@@ -182,7 +188,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       None,
       classOf[JobManager],
       classOf[MemoryArchivist])
-    new AkkaActorGateway(jm, null)
+    new AkkaActorGateway(jm, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): ActorGateway = {
@@ -193,7 +199,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
       _system,
       LeaderRetrievalUtils.createLeaderRetrievalService(config, jm),
       classOf[TestingResourceManager])
-    new AkkaActorGateway(rm, null)
+    new AkkaActorGateway(rm, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
index d46dd71..d4ca8f6 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 
 /** Mixing for testing utils
@@ -32,6 +33,6 @@ trait ScalaTestingUtils {
     * @return [[ActorGateway]] encapsulating the given [[ActorRef]]
     */
   implicit def actorRef2InstanceGateway(actor: ActorRef): ActorGateway = {
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2dfd463e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 20260c7..d6221f5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
@@ -45,7 +46,7 @@ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMess
 
 import scala.concurrent.duration.TimeUnit
 import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContextExecutor, Await, ExecutionContext}
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor}
 import scala.language.postfixOps
 
 /**
@@ -326,7 +327,7 @@ object TestingUtils {
       Await.ready(notificationResult, TESTING_DURATION)
     }
 
-    new AkkaActorGateway(taskManager, null)
+    new AkkaActorGateway(taskManager, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   /** Stops the given actor by sending it a Kill message
@@ -456,7 +457,7 @@ object TestingUtils {
         jobManagerClass,
         classOf[MemoryArchivist])
 
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   /** Creates a forwarding JobManager which sends all received message to the forwarding target.
@@ -478,7 +479,7 @@ object TestingUtils {
           Props(
             classOf[ForwardingActor],
             forwardingTarget,
-            None),
+            Option(HighAvailabilityServices.DEFAULT_LEADER_ID)),
           name
         )
       case None =>
@@ -486,11 +487,11 @@ object TestingUtils {
           Props(
             classOf[ForwardingActor],
             forwardingTarget,
-            None)
+            Option(HighAvailabilityServices.DEFAULT_LEADER_ID))
         )
     }
 
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }
 
   def submitJobAndWait(
@@ -537,7 +538,7 @@ object TestingUtils {
       LeaderRetrievalUtils.createLeaderRetrievalService(configuration, jobManager),
       classOf[TestingResourceManager])
 
-    new AkkaActorGateway(actor, null)
+    new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID)
   }