You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:08 UTC

[03/16] flink git commit: [FLINK-6136] Separate EmbeddedHaServices and StandaloneHaServices

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
deleted file mode 100644
index ab1ce47..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.highavailability.nonha;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-
-import java.util.HashMap;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A registry for running jobs, not-highly available.
- */
-public class NonHaRegistry implements RunningJobsRegistry {
-
-	/** The currently running jobs */
-	private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
-
-	@Override
-	public void setJobRunning(JobID jobID) {
-		checkNotNull(jobID);
-
-		synchronized (jobStatus) {
-			jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
-		}
-	}
-
-	@Override
-	public void setJobFinished(JobID jobID) {
-		checkNotNull(jobID);
-
-		synchronized (jobStatus) {
-			jobStatus.put(jobID, JobSchedulingStatus.DONE);
-		}
-	}
-
-	@Override
-	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
-		checkNotNull(jobID);
-		
-		synchronized (jobStatus) {
-			JobSchedulingStatus status = jobStatus.get(jobID);
-			return status == null ? JobSchedulingStatus.PENDING : status;
-		}
-	}
-
-	@Override
-	public void clearJob(JobID jobID) {
-		checkNotNull(jobID);
-
-		synchronized (jobStatus) {
-			jobStatus.remove(jobID);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
new file mode 100644
index 0000000..a338edc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedHaServices extends AbstractNonHaServices {
+
+	private final Executor executor;
+
+	private final EmbeddedLeaderService resourceManagerLeaderService;
+
+	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
+
+	public EmbeddedHaServices(Executor executor) {
+		this.executor = Preconditions.checkNotNull(executor);
+		this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
+		this.jobManagerLeaderServices = new HashMap<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		return resourceManagerLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		return resourceManagerLeaderService.createLeaderElectionService();
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (lock) {
+			checkNotShutdown();
+			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+			return service.createLeaderRetrievalService();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (lock) {
+			checkNotShutdown();
+			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+			return service.createLeaderElectionService();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// internal
+	// ------------------------------------------------------------------------
+
+	@GuardedBy("lock")
+	private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
+		EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
+		if (service == null) {
+			service = new EmbeddedLeaderService(executor);
+			jobManagerLeaderServices.put(jobID, service);
+		}
+		return service;
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void close() throws Exception {
+		synchronized (lock) {
+			if (!isShutDown()) {
+				// stop all job manager leader services
+				for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
+					service.shutdown();
+				}
+				jobManagerLeaderServices.clear();
+
+				resourceManagerLeaderService.shutdown();
+			}
+
+			super.close();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
new file mode 100644
index 0000000..5eb4375
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders and notifies listeners.
+ * 
+ * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+	private final Object lock = new Object();
+
+	private final Executor notificationExecutor;
+
+	private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+	private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+	/** proposed leader, which has been notified of leadership grant, but has not confirmed */
+	private EmbeddedLeaderElectionService currentLeaderProposed;
+
+	/** actual leader that has confirmed leadership and of which listeners have been notified */
+	private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+	/** fencing UID for the current leader (or proposed leader) */
+	private UUID currentLeaderSessionId;
+
+	/** the cached address of the current leader */
+	private String currentLeaderAddress;
+
+	/** flag marking the service as terminated */
+	private boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	public EmbeddedLeaderService(Executor notificationsDispatcher) {
+		this.notificationExecutor = checkNotNull(notificationsDispatcher);
+		this.allLeaderContenders = new HashSet<>();
+		this.listeners = new HashSet<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown and errors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Shuts down this leader election service.
+	 * 
+	 * <p>This method does not perform a clean revocation of the leader status and
+	 * no notification to any leader listeners. It simply notifies all contenders
+	 * and listeners that the service is no longer available.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			shutdownInternally(new Exception("Leader election service is shutting down"));
+		}
+	}
+
+	private void fatalError(Throwable error) {
+		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+		synchronized (lock) {
+			shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+		}
+	}
+
+	@GuardedBy("lock")
+	private void shutdownInternally(Exception exceptionForHandlers) {
+		assert Thread.holdsLock(lock);
+
+		if (!shutdown) {
+			// clear all leader status
+			currentLeaderProposed = null;
+			currentLeaderConfirmed = null;
+			currentLeaderSessionId = null;
+			currentLeaderAddress = null;
+
+			// fail all registered listeners
+			for (EmbeddedLeaderElectionService service : allLeaderContenders) {
+				service.shutdown(exceptionForHandlers);
+			}
+			allLeaderContenders.clear();
+
+			// fail all registered listeners
+			for (EmbeddedLeaderRetrievalService service : listeners) {
+				service.shutdown(exceptionForHandlers);
+			}
+			listeners.clear();
+
+			shutdown = true;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  creating contenders and listeners
+	// ------------------------------------------------------------------------
+
+	public LeaderElectionService createLeaderElectionService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderElectionService();
+	}
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderRetrievalService();
+	}
+
+	// ------------------------------------------------------------------------
+	//  adding and removing contenders & listeners
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Callback from leader contenders when they start their service.
+	 */
+	void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader election service is already started");
+
+			try {
+				if (!allLeaderContenders.add(service)) {
+					throw new IllegalStateException("leader election service was added to this service multiple times");
+				}
+
+				service.contender = contender;
+				service.running = true;
+
+				updateLeader();
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	/**
+	 * Callback from leader contenders when they stop their service.
+	 */
+	void removeContender(EmbeddedLeaderElectionService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!allLeaderContenders.remove(service)) {
+					throw new IllegalStateException("leader election service does not belong to this service");
+				}
+
+				// stop the service
+				service.contender = null;
+				service.running = false;
+				service.isLeader = false;
+
+				// if that was the current leader, unset its status
+				if (currentLeaderConfirmed == service) {
+					currentLeaderConfirmed = null;
+					currentLeaderSessionId = null;
+					currentLeaderAddress = null;
+				}
+				if (currentLeaderProposed == service) {
+					currentLeaderProposed = null;
+					currentLeaderSessionId = null;
+				}
+
+				updateLeader();
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	/**
+	 * Callback from leader contenders when they confirm a leader grant
+	 */
+	void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
+		synchronized (lock) {
+			// if the service was shut down in the meantime, ignore this confirmation
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				// check if the confirmation is for the same grant, or whether it is a stale grant 
+				if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
+					final String address = service.contender.getAddress();
+					LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
+
+					// mark leadership
+					currentLeaderConfirmed = service;
+					currentLeaderAddress = address;
+					currentLeaderProposed = null;
+					service.isLeader = true;
+
+					// notify all listeners
+					for (EmbeddedLeaderRetrievalService listener : listeners) {
+						notificationExecutor.execute(
+								new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+					}
+				}
+				else {
+					LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void updateLeader() {
+		// this must be called under the lock
+		assert Thread.holdsLock(lock);
+
+		if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
+			// we need a new leader
+			if (allLeaderContenders.isEmpty()) {
+				// no new leader available, tell everyone that there is no leader currently
+				for (EmbeddedLeaderRetrievalService listener : listeners) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+				}
+			}
+			else {
+				// propose a leader and ask it
+				final UUID leaderSessionId = UUID.randomUUID();
+				EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
+
+				currentLeaderSessionId = leaderSessionId;
+				currentLeaderProposed = leaderService;
+
+				LOG.info("Proposing leadership to contender {} @ {}",
+						leaderService.contender, leaderService.contender.getAddress());
+
+				notificationExecutor.execute(
+						new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+			}
+		}
+	}
+
+	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader retrieval service is already started");
+
+			try {
+				if (!listeners.add(service)) {
+					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+				}
+
+				service.listener = listener;
+				service.running = true;
+
+				// if we already have a leader, immediately notify this new listener
+				if (currentLeaderConfirmed != null) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	void removeListener(EmbeddedLeaderRetrievalService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!listeners.remove(service)) {
+					throw new IllegalStateException("leader retrieval service does not belong to this service");
+				}
+
+				// stop the service
+				service.listener = null;
+				service.running = false;
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  election and retrieval service implementations 
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderElectionService implements LeaderElectionService {
+
+		volatile LeaderContender contender;
+
+		volatile boolean isLeader;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderContender contender) throws Exception {
+			checkNotNull(contender);
+			addContender(this, contender);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeContender(this);
+		}
+
+		@Override
+		public void confirmLeaderSessionID(UUID leaderSessionID) {
+			checkNotNull(leaderSessionID);
+			confirmLeader(this, leaderSessionID);
+		}
+
+		@Override
+		public boolean hasLeadership() {
+			return isLeader;
+		}
+
+		void shutdown(Exception cause) {
+			if (running) {
+				running = false;
+				isLeader = false;
+				contender.handleError(cause);
+				contender = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+		volatile LeaderRetrievalListener listener;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			checkNotNull(listener);
+			addListener(this, listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeListener(this);
+		}
+
+		public void shutdown(Exception cause) {
+			if (running) {
+				running = false;
+				listener.handleError(cause);
+				listener = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  asynchronous notifications
+	// ------------------------------------------------------------------------
+
+	private static class NotifyOfLeaderCall implements Runnable {
+
+		@Nullable
+		private final String address;       // null if leader revoked without new leader
+		@Nullable
+		private final UUID leaderSessionId; // null if leader revoked without new leader
+
+		private final LeaderRetrievalListener listener;
+		private final Logger logger;
+
+		NotifyOfLeaderCall(
+				@Nullable String address,
+				@Nullable UUID leaderSessionId,
+				LeaderRetrievalListener listener,
+				Logger logger) {
+
+			this.address = address;
+			this.leaderSessionId = leaderSessionId;
+			this.listener = checkNotNull(listener);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				listener.notifyLeaderAddress(address, leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class GrantLeadershipCall implements Runnable {
+
+		private final LeaderContender contender;
+		private final UUID leaderSessionId;
+		private final Logger logger;
+
+		GrantLeadershipCall(
+				LeaderContender contender,
+				UUID leaderSessionId,
+				Logger logger) {
+
+			this.contender = checkNotNull(contender);
+			this.leaderSessionId = checkNotNull(leaderSessionId);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				contender.grantLeadership(leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error granting leadership to contender", t);
+				contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
new file mode 100644
index 0000000..a56b077
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/leaderelection/SingleLeaderElectionService.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.leaderelection;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the {@link LeaderElectionService} interface that handles a single
+ * leader contender. When started, this service immediately grants the contender the leadership.
+ * 
+ * <p>The implementation accepts a single static leader session ID and is hence compatible with
+ * pre-configured single leader (no leader failover) setups.
+ * 
+ * <p>This implementation supports a series of leader listeners that receive notifications about
+ * the leader contender.
+ */
+public class SingleLeaderElectionService implements LeaderElectionService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class);
+
+	// ------------------------------------------------------------------------
+
+	/** lock for all operations on this instance */
+	private final Object lock = new Object();
+
+	/** The executor service that dispatches notifications */
+	private final Executor notificationExecutor;
+
+	/** The leader ID assigned to the immediate leader */
+	private final UUID leaderId;
+
+	@GuardedBy("lock")
+	private final HashSet<EmbeddedLeaderRetrievalService> listeners;
+
+	/** The currently proposed leader */
+	@GuardedBy("lock")
+	private volatile LeaderContender proposedLeader;
+
+	/** The confirmed leader */
+	@GuardedBy("lock")
+	private volatile LeaderContender leader;
+
+	/** The address of the confirmed leader */
+	@GuardedBy("lock")
+	private volatile String leaderAddress;
+
+	/** Flag marking this service as shutdown, meaning it cannot be started again */
+	@GuardedBy("lock")
+	private volatile boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new leader election service. The service assigns the given leader ID
+	 * to the leader contender.
+	 * 
+	 * @param leaderId The constant leader ID assigned to the leader.
+	 */
+	public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) {
+		this.notificationExecutor = checkNotNull(notificationsDispatcher);
+		this.leaderId = checkNotNull(leaderId);
+		this.listeners = new HashSet<>();
+
+		shutdown = false;
+	}
+
+	// ------------------------------------------------------------------------
+	//  leader election service
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void start(LeaderContender contender) throws Exception {
+		checkNotNull(contender, "contender");
+
+		synchronized (lock) {
+			checkState(!shutdown, "service is shut down");
+			checkState(proposedLeader == null, "service already started");
+
+			// directly grant leadership to the given contender
+			proposedLeader = contender;
+			notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId));
+		}
+	}
+
+	@Override
+	public void stop() {
+		synchronized (lock) {
+			// notify all listeners that there is no leader
+			for (EmbeddedLeaderRetrievalService listener : listeners) {
+				notificationExecutor.execute(
+						new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+			}
+
+			// if there was a leader, revoke its leadership
+			if (leader != null) {
+				try {
+					leader.revokeLeadership();
+				} catch (Throwable t) {
+					leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+				}
+			}
+
+			proposedLeader = null;
+			leader = null;
+			leaderAddress = null;
+		}
+	}
+
+	@Override
+	public void confirmLeaderSessionID(UUID leaderSessionID) {
+		checkNotNull(leaderSessionID, "leaderSessionID");
+		checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");
+
+		synchronized (lock) {
+			checkState(!shutdown, "service is shut down");
+			checkState(proposedLeader != null, "no leader proposed yet");
+			checkState(leader == null, "leader already confirmed");
+
+			// accept the confirmation
+			final String address = proposedLeader.getAddress();
+			leaderAddress = address;
+			leader = proposedLeader;
+
+			// notify all listeners
+			for (EmbeddedLeaderRetrievalService listener : listeners) {
+				notificationExecutor.execute(
+						new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
+			}
+		}
+	}
+
+	@Override
+	public boolean hasLeadership() {
+		synchronized (lock) {
+			return leader != null;
+		}
+	}
+
+	void errorOnGrantLeadership(LeaderContender contender, Throwable error) {
+		LOG.warn("Error notifying leader listener about new leader", error);
+		contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error));
+		
+		synchronized (lock) {
+			if (proposedLeader == contender) {
+				proposedLeader = null;
+				leader = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	public boolean isShutdown() {
+		return shutdown;
+	}
+
+	public void shutdown() {
+		shutdownInternally(new Exception("The leader service is shutting down"));
+	}
+
+	private void shutdownInternally(Exception exceptionForHandlers) {
+		synchronized (lock) {
+			if (shutdown) {
+				return;
+			}
+
+			shutdown = true;
+
+			// fail the leader (if there is one)
+			if (leader != null) {
+				try {
+					leader.handleError(exceptionForHandlers);
+				} catch (Throwable ignored) {}
+			}
+
+			// clear all leader status
+			leader = null;
+			proposedLeader = null;
+			leaderAddress = null;
+
+			// fail all registered listeners
+			for (EmbeddedLeaderRetrievalService service : listeners) {
+				service.shutdown(exceptionForHandlers);
+			}
+			listeners.clear();
+		}
+	}
+
+	private void fatalError(Throwable error) {
+		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+		shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+	}
+
+	// ------------------------------------------------------------------------
+	//  leader listeners
+	// ------------------------------------------------------------------------
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderRetrievalService();
+	}
+
+	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader retrieval service is already started");
+
+			try {
+				if (!listeners.add(service)) {
+					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+				}
+
+				service.listener = listener;
+				service.running = true;
+
+				// if we already have a leader, immediately notify this new listener
+				if (leader != null) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	void removeListener(EmbeddedLeaderRetrievalService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!listeners.remove(service)) {
+					throw new IllegalStateException("leader retrieval service does not belong to this service");
+				}
+
+				// stop the service
+				service.listener = null;
+				service.running = false;
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+		volatile LeaderRetrievalListener listener;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			checkNotNull(listener);
+			addListener(this, listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeListener(this);
+		}
+
+		void shutdown(Exception cause) {
+			if (running) {
+				final LeaderRetrievalListener lst = listener;
+				running = false;
+				listener = null;
+
+				try {
+					lst.handleError(cause);
+				} catch (Throwable ignored) {}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  asynchronous notifications
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This runnable informs a leader contender that it gained leadership.
+	 */
+	private class GrantLeadershipCall implements Runnable {
+
+		private final LeaderContender contender;
+		private final UUID leaderSessionId;
+
+		GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) {
+
+			this.contender = checkNotNull(contender);
+			this.leaderSessionId = checkNotNull(leaderSessionId);
+		}
+
+		@Override
+		public void run() {
+			try {
+				contender.grantLeadership(leaderSessionId);
+			}
+			catch (Throwable t) {
+				errorOnGrantLeadership(contender, t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This runnable informs a leader listener of a new leader
+	 */
+	private static class NotifyOfLeaderCall implements Runnable {
+
+		@Nullable
+		private final String address;       // null if leader revoked without new leader
+		@Nullable
+		private final UUID leaderSessionId; // null if leader revoked without new leader
+
+		private final LeaderRetrievalListener listener;
+		private final Logger logger;
+
+		NotifyOfLeaderCall(
+				@Nullable String address,
+				@Nullable UUID leaderSessionId,
+				LeaderRetrievalListener listener,
+				Logger logger) {
+
+			this.address = address;
+			this.leaderSessionId = leaderSessionId;
+			this.listener = checkNotNull(listener);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				listener.notifyLeaderAddress(address, leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
new file mode 100644
index 0000000..c7b54c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case.
+ * This implementation can be used for testing, and for cluster setups that do not
+ * tolerate failures of the master processes (JobManager, ResourceManager).
+ * 
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager and JobManager, and stores checkpoints and metadata simply on the
+ * heap or on a local file system and therefore in a storage without guarantees.
+ */
+public class StandaloneHaServices extends AbstractNonHaServices {
+
+	/** The constant name of the ResourceManager RPC endpoint */
+	private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
+
+	/** The fix address of the ResourceManager */
+	private final String resourceManagerAddress;
+
+	/** The fix address of the JobManager */
+	private final String jobManagerAddress;
+
+	/**
+	 * Creates a new services class for the fix pre-defined leaders.
+	 * 
+	 * @param resourceManagerAddress    The fix address of the ResourceManager
+	 */
+	public StandaloneHaServices(String resourceManagerAddress, String jobManagerAddress) {
+		this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
+		this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
+		}
+
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderElectionService();
+		}
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		synchronized (lock) {
+			checkNotShutdown();
+
+			return new StandaloneLeaderElectionService();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
new file mode 100644
index 0000000..585ef34
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneRunningJobsRegistry.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import java.util.HashMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A registry for running jobs, not-highly available.
+ */
+public class StandaloneRunningJobsRegistry implements RunningJobsRegistry {
+
+	/** The currently running jobs */
+	private final HashMap<JobID, JobSchedulingStatus> jobStatus = new HashMap<>();
+
+	@Override
+	public void setJobRunning(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (jobStatus) {
+			jobStatus.put(jobID, JobSchedulingStatus.RUNNING);
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (jobStatus) {
+			jobStatus.put(jobID, JobSchedulingStatus.DONE);
+		}
+	}
+
+	@Override
+	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
+		checkNotNull(jobID);
+		
+		synchronized (jobStatus) {
+			JobSchedulingStatus status = jobStatus.get(jobID);
+			return status == null ? JobSchedulingStatus.PENDING : status;
+		}
+	}
+
+	@Override
+	public void clearJob(JobID jobID) {
+		checkNotNull(jobID);
+
+		synchronized (jobStatus) {
+			jobStatus.remove(jobID);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
new file mode 100644
index 0000000..5d895c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
+ * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
+ * 
+ * <pre>
+ * /flink
+ *      +/cluster_id_1/resource_manager_lock
+ *      |            |
+ *      |            +/job-id-1/job_manager_lock
+ *      |            |         /checkpoints/latest
+ *      |            |                     /latest-1
+ *      |            |                     /latest-2
+ *      |            |
+ *      |            +/job-id-2/job_manager_lock
+ *      |      
+ *      +/cluster_id_2/resource_manager_lock
+ *                   |
+ *                   +/job-id-1/job_manager_lock
+ *                            |/checkpoints/latest
+ *                            |            /latest-1
+ *                            |/persisted_job_graph
+ * </pre>
+ * 
+ * <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+ * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
+ * accommodate specific permission.
+ * 
+ * <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
+ * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
+ * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
+ * 
+ * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
+ * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
+ * 
+ * <p>In the case of a standalone cluster, that cluster-id needs to be configured via
+ * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
+ * cluster and participate in the execution of the same set of jobs.
+ */
+public class ZooKeeperHaServices implements HighAvailabilityServices {
+
+	private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
+
+	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
+
+	// ------------------------------------------------------------------------
+	
+	
+	/** The ZooKeeper client to use */
+	private final CuratorFramework client;
+
+	/** The executor to run ZooKeeper callbacks on */
+	private final Executor executor;
+
+	/** The runtime configuration */
+	private final Configuration configuration;
+
+	/** The zookeeper based running jobs registry */
+	private final RunningJobsRegistry runningJobsRegistry;
+
+	public ZooKeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
+		this.client = checkNotNull(client);
+		this.executor = checkNotNull(executor);
+		this.configuration = checkNotNull(configuration);
+		this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+		return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration, executor);
+	}
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() {
+		return runningJobsRegistry;
+	}
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		return createBlobStore(configuration);
+	}
+
+	/**
+	 * Creates the BLOB store in which BLOBs are stored in a highly-available
+	 * fashion.
+	 *
+	 * @param configuration configuration to extract the storage path from
+	 * @return Blob store
+	 * @throws IOException if the blob store could not be created
+	 */
+	public static BlobStore createBlobStore(
+		final Configuration configuration) throws IOException {
+		String storagePath = configuration.getValue(
+			HighAvailabilityOptions.HA_STORAGE_PATH);
+		if (isNullOrWhitespaceOnly(storagePath)) {
+			throw new IllegalConfigurationException("Configuration is missing the mandatory parameter: " +
+					HighAvailabilityOptions.HA_STORAGE_PATH);
+		}
+
+		final Path path;
+		try {
+			path = new Path(storagePath);
+		} catch (Exception e) {
+			throw new IOException("Invalid path for highly available storage (" +
+					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final FileSystem fileSystem;
+		try {
+			fileSystem = path.getFileSystem();
+		} catch (Exception e) {
+			throw new IOException("Could not create FileSystem for highly available storage (" +
+					HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+		}
+
+		final String clusterId =
+			configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+		storagePath += "/" + clusterId;
+
+		return new FileSystemBlobStore(fileSystem, storagePath);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void close() throws Exception {
+		client.close();
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		close();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static String getPathForJobManager(final JobID jobID) {
+		return "/" + jobID + JOB_MANAGER_LEADER_PATH;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
new file mode 100644
index 0000000..8a083d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A zookeeper based registry for running jobs, highly available.
+ */
+public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
+
+	private static final Charset ENCODING = Charset.forName("utf-8");
+
+	/** The ZooKeeper client to use */
+	private final CuratorFramework client;
+
+	private final String runningJobPath;
+
+	public ZooKeeperRunningJobsRegistry(final CuratorFramework client, final Configuration configuration) {
+		this.client = checkNotNull(client, "client");
+		this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
+	}
+
+	@Override
+	public void setJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
+		}
+		catch (Exception e) {
+			throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
+		}
+		catch (Exception e) {
+			throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
+		}
+	}
+
+	@Override
+	public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			final String zkPath = createZkPath(jobID);
+			final Stat stat = client.checkExists().forPath(zkPath);
+			if (stat != null) {
+				// found some data, try to parse it
+				final byte[] data = client.getData().forPath(zkPath);
+				if (data != null) {
+					try {
+						final String name = new String(data, ENCODING);
+						return JobSchedulingStatus.valueOf(name);
+					}
+					catch (IllegalArgumentException e) {
+						throw new IOException("Found corrupt data in ZooKeeper: " + 
+								Arrays.toString(data) + " is no valid job status");
+					}
+				}
+			}
+
+			// nothing found, yet, must be in status 'PENDING'
+			return JobSchedulingStatus.PENDING;
+		}
+		catch (Exception e) {
+			throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e);
+		}
+	}
+
+	@Override
+	public void clearJob(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			final String zkPath = createZkPath(jobID);
+			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+			this.client.delete().forPath(zkPath);
+		}
+		catch (Exception e) {
+			throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
+		}
+	}
+
+	private String createZkPath(JobID jobID) {
+		return runningJobPath + jobID.toString();
+	}
+
+	private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
+		final String zkPath = createZkPath(jobID);
+		this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+		this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e60ff77..3a55f2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -64,7 +64,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -88,8 +87,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -127,6 +126,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
+	/** Default names for Flink's distributed components */
+	public static final String JOB_MANAGER_NAME = "jobmanager";
+	public static final String ARCHIVE_NAME = "archive";
+
 	private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID");
 
@@ -208,7 +211,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			FatalErrorHandler errorHandler,
 			ClassLoader userCodeLoader) throws Exception {
 
-		super(rpcService, RpcServiceUtils.createRandomName(JobManager.JOB_MANAGER_NAME()));
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
 
 		this.resourceId = checkNotNull(resourceId);
 		this.jobGraph = checkNotNull(jobGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 03fbef5..3e7f2f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -235,7 +235,9 @@ public class MiniCluster {
 
 				// create the high-availability services
 				LOG.info("Starting high-availability services");
-				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+					configuration,
+					commonRpcService.getExecutor());
 
 				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 5d9db19..f9cf01d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -109,7 +109,7 @@ public class QueryableStateClient {
 
 		// Create a leader retrieval service
 		LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils
-				.createLeaderRetrievalService(config);
+				.createLeaderRetrievalService(config, true);
 
 		// Get the ask timeout
 		String askTimeoutString = config.getString(

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 2ef8276..bef0aa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -87,6 +87,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		extends RpcEndpoint<ResourceManagerGateway>
 		implements LeaderContender {
 
+	public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
 	/** Unique id of the resource manager */
 	private final ResourceID resourceId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index fa75bbb..2c64f08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
index 79d1c02..d4bc2d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
deleted file mode 100644
index 0007318..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ConfigurationException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.exceptions;
-
-/**
- * Exception which occures when creating a configuration object fails.
- */
-public class ConfigurationException extends Exception {
-	private static final long serialVersionUID = 3971647332059381556L;
-
-	public ConfigurationException(String message) {
-		super(message);
-	}
-
-	public ConfigurationException(String message, Throwable cause) {
-		super(message, cause);
-	}
-
-	public ConfigurationException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
index d21c251..a651168 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -23,7 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
deleted file mode 100644
index e555e7f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc;
-
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.flink.util.Preconditions;
-import org.jboss.netty.channel.ChannelException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
- * or constructing RPC addresses.
- */
-public class RpcServiceUtils {
-
-	private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class);
-
-	private static final AtomicLong nextNameOffset = new AtomicLong(0L);
-
-	// ------------------------------------------------------------------------
-	//  RPC instantiation
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Utility method to create RPC service from configuration and hostname, port.
-	 *
-	 * @param hostname   The hostname/address that describes the TaskManager's data location.
-	 * @param port           If true, the TaskManager will not initiate the TCP network stack.
-	 * @param configuration                 The configuration for the TaskManager.
-	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
-	 * @throws IOException      Thrown, if the actor system can not bind to the address
-	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
-	 */
-	public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
-		LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));
-
-		final ActorSystem actorSystem;
-
-		try {
-			Config akkaConfig;
-
-			if (hostname != null && !hostname.isEmpty()) {
-				// remote akka config
-				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
-			} else {
-				// local akka config
-				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
-			}
-
-			LOG.debug("Using akka configuration \n {}.", akkaConfig);
-
-			actorSystem = AkkaUtils.createActorSystem(akkaConfig);
-		} catch (Throwable t) {
-			if (t instanceof ChannelException) {
-				Throwable cause = t.getCause();
-				if (cause != null && t.getCause() instanceof java.net.BindException) {
-					String address = NetUtils.hostAndPortToUrlString(hostname, port);
-					throw new IOException("Unable to bind AkkaRpcService actor system to address " +
-						address + " - " + cause.getMessage(), t);
-				}
-			}
-			throw new Exception("Could not create TaskManager actor system", t);
-		}
-
-		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
-		return new AkkaRpcService(actorSystem, timeout);
-	}
-
-	// ------------------------------------------------------------------------
-	//  RPC endpoint addressing
-	// ------------------------------------------------------------------------
-
-	/**
-	 *
-	 * @param hostname     The hostname or address where the target RPC service is listening.
-	 * @param port         The port where the target RPC service is listening.
-	 * @param endpointName The name of the RPC endpoint.
-	 * @param config       The configuration from which to deduce further settings.
-	 *
-	 * @return The RPC URL of the specified RPC endpoint.
-	 */
-	public static String getRpcUrl(String hostname, int port, String endpointName, Configuration config)
-			throws UnknownHostException {
-
-		checkNotNull(config, "config is null");
-
-		final boolean sslEnabled = config.getBoolean(
-					ConfigConstants.AKKA_SSL_ENABLED,
-					ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
-				SSLUtils.getSSLEnabled(config);
-
-		return getRpcUrl(hostname, port, endpointName, sslEnabled);
-	}
-
-	/**
-	 * 
-	 * @param hostname     The hostname or address where the target RPC service is listening.
-	 * @param port         The port where the target RPC service is listening.
-	 * @param endpointName The name of the RPC endpoint.
-	 * @param secure       True, if security/encryption is enabled, false otherwise.
-	 * 
-	 * @return The RPC URL of the specified RPC endpoint.
-	 */
-	public static String getRpcUrl(String hostname, int port, String endpointName, boolean secure)
-			throws UnknownHostException {
-
-		checkNotNull(hostname, "hostname is null");
-		checkNotNull(endpointName, "endpointName is null");
-		checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
-
-		final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp";
-		final String hostPort = NetUtils.hostAndPortToUrlString(hostname, port);
-
-		return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName);
-	}
-
-	/**
-	 * Creates a random name of the form prefix_X, where X is an increasing number.
-	 *
-	 * @param prefix Prefix string to prepend to the monotonically increasing name offset number
-	 * @return A random name of the form prefix_X where X is an increasing number
-	 */
-	public static String createRandomName(String prefix) {
-		Preconditions.checkNotNull(prefix, "Prefix must not be null.");
-
-		long nameOffset;
-
-		// obtain the next name offset by incrementing it atomically
-		do {
-			nameOffset = nextNameOffset.get();
-		} while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
-
-		return prefix + '_' + nameOffset;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/** This class is not meant to be instantiated */
-	private RpcServiceUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
new file mode 100644
index 0000000..eab4de8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
+import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.util.Preconditions;
+import org.jboss.netty.channel.ChannelException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * These RPC utilities contain helper methods around RPC use, such as starting an RPC service,
+ * or constructing RPC addresses.
+ */
+public class AkkaRpcServiceUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
+
+	private static final String AKKA_TCP = "akka.tcp";
+	private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
+
+	private static final AtomicLong nextNameOffset = new AtomicLong(0L);
+
+	// ------------------------------------------------------------------------
+	//  RPC instantiation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Utility method to create RPC service from configuration and hostname, port.
+	 *
+	 * @param hostname   The hostname/address that describes the TaskManager's data location.
+	 * @param port           If true, the TaskManager will not initiate the TCP network stack.
+	 * @param configuration                 The configuration for the TaskManager.
+	 * @return   The rpc service which is used to start and connect to the TaskManager RpcEndpoint .
+	 * @throws IOException      Thrown, if the actor system can not bind to the address
+	 * @throws Exception      Thrown is some other error occurs while creating akka actor system
+	 */
+	public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
+		LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port));
+
+		final ActorSystem actorSystem;
+
+		try {
+			Config akkaConfig;
+
+			if (hostname != null && !hostname.isEmpty()) {
+				// remote akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port);
+			} else {
+				// local akka config
+				akkaConfig = AkkaUtils.getAkkaConfig(configuration);
+			}
+
+			LOG.debug("Using akka configuration \n {}.", akkaConfig);
+
+			actorSystem = AkkaUtils.createActorSystem(akkaConfig);
+		} catch (Throwable t) {
+			if (t instanceof ChannelException) {
+				Throwable cause = t.getCause();
+				if (cause != null && t.getCause() instanceof java.net.BindException) {
+					String address = NetUtils.hostAndPortToUrlString(hostname, port);
+					throw new IOException("Unable to bind AkkaRpcService actor system to address " +
+						address + " - " + cause.getMessage(), t);
+				}
+			}
+			throw new Exception("Could not create TaskManager actor system", t);
+		}
+
+		final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
+		return new AkkaRpcService(actorSystem, timeout);
+	}
+
+	// ------------------------------------------------------------------------
+	//  RPC endpoint addressing
+	// ------------------------------------------------------------------------
+
+	/**
+	 *
+	 * @param hostname     The hostname or address where the target RPC service is listening.
+	 * @param port         The port where the target RPC service is listening.
+	 * @param endpointName The name of the RPC endpoint.
+	 * @param config       The configuration from which to deduce further settings.
+	 *
+	 * @return The RPC URL of the specified RPC endpoint.
+	 */
+	public static String getRpcUrl(
+		String hostname,
+		int port,
+		String endpointName,
+		HighAvailabilityServicesUtils.AddressResolution addressResolution,
+		Configuration config) throws UnknownHostException {
+
+		checkNotNull(config, "config is null");
+
+		final boolean sslEnabled = config.getBoolean(
+					ConfigConstants.AKKA_SSL_ENABLED,
+					ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+				SSLUtils.getSSLEnabled(config);
+
+		return getRpcUrl(
+			hostname,
+			port,
+			endpointName,
+			addressResolution,
+			sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
+	}
+
+	/**
+	 * 
+	 * @param hostname     The hostname or address where the target RPC service is listening.
+	 * @param port         The port where the target RPC service is listening.
+	 * @param endpointName The name of the RPC endpoint.
+	 * @param akkaProtocol       True, if security/encryption is enabled, false otherwise.
+	 * 
+	 * @return The RPC URL of the specified RPC endpoint.
+	 */
+	public static String getRpcUrl(
+			String hostname,
+			int port,
+			String endpointName,
+			HighAvailabilityServicesUtils.AddressResolution addressResolution,
+			AkkaProtocol akkaProtocol) throws UnknownHostException {
+
+		checkNotNull(hostname, "hostname is null");
+		checkNotNull(endpointName, "endpointName is null");
+		checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]");
+
+		final String protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
+
+		if (addressResolution == AddressResolution.TRY_ADDRESS_RESOLUTION) {
+			// Fail fast if the hostname cannot be resolved
+			//noinspection ResultOfMethodCallIgnored
+			InetAddress.getByName(hostname);
+		}
+
+		final String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port);
+
+		return String.format("%s://flink@%s/user/%s", protocolPrefix, hostPort, endpointName);
+	}
+
+	public enum AkkaProtocol {
+		TCP,
+		SSL_TCP
+	}
+
+	/**
+	 * Creates a random name of the form prefix_X, where X is an increasing number.
+	 *
+	 * @param prefix Prefix string to prepend to the monotonically increasing name offset number
+	 * @return A random name of the form prefix_X where X is an increasing number
+	 */
+	public static String createRandomName(String prefix) {
+		Preconditions.checkNotNull(prefix, "Prefix must not be null.");
+
+		long nameOffset;
+
+		// obtain the next name offset by incrementing it atomically
+		do {
+			nameOffset = nextNameOffset.get();
+		} while (!nextNameOffset.compareAndSet(nameOffset, nameOffset + 1L));
+
+		return prefix + '_' + nameOffset;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated */
+	private AkkaRpcServiceUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5b8c8ee..d05d900 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -65,7 +65,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -85,7 +85,6 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -111,6 +110,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
+	public static final String TASK_MANAGER_NAME = "taskmanager";
+
 	/** The connection information of this task manager */
 	private final TaskManagerLocation taskManagerLocation;
 
@@ -186,7 +187,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			JobLeaderService jobLeaderService,
 			FatalErrorHandler fatalErrorHandler) {
 
-		super(rpcService, RpcServiceUtils.createRandomName(TaskManager.TASK_MANAGER_NAME()));
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(TaskExecutor.TASK_MANAGER_NAME));
 
 		checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0bb99c7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2be8ff1..2ed1578 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
@@ -219,6 +219,6 @@ public class TaskManagerRunner implements FatalErrorHandler {
 				"use 0 to let the system choose port automatically.",
 			ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort);
 
-		return RpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
+		return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration);
 	}
 }