You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:28 UTC

[09/52] [abbrv] flink git commit: [FLINK-4835] [cluster management] Add embedded version of the high-availability services

[FLINK-4835] [cluster management] Add embedded version of the high-availability services

This includes the addition of the EmbeddedLeaderService
and a clean shutdown hook for all high availability services.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9615f15b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9615f15b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9615f15b

Branch: refs/heads/master
Commit: 9615f15beca37d6393f6ea78ec35f712536c8f64
Parents: 208324d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:57:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../StandaloneCheckpointRecoveryFactory.java    |   4 +-
 .../highavailability/EmbeddedNonHaServices.java |  62 +++
 .../HighAvailabilityServices.java               |   7 +-
 .../runtime/highavailability/NonHaServices.java |  62 +--
 .../highavailability/ZookeeperHaServices.java   |  12 +-
 .../nonha/AbstractNonHaServices.java            | 175 +++++++
 .../nonha/EmbeddedLeaderService.java            | 466 +++++++++++++++++++
 .../TestingHighAvailabilityServices.java        |   9 +
 8 files changed, 736 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a9624fb..57785ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
 	public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
 			throws Exception {
 
-		return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
-				.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+		return new StandaloneCompletedCheckpointStore(
+				CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
new file mode 100644
index 0000000..58da287
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedNonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
+
+	private final EmbeddedLeaderService resourceManagerLeaderService;
+
+	public EmbeddedNonHaServices() {
+		super();
+		this.resourceManagerLeaderService = new EmbeddedLeaderService(getExecutorService());
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+		return resourceManagerLeaderService.createLeaderRetrievalService();
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+		return resourceManagerLeaderService.createLeaderElectionService();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		super.shutdown();
+		resourceManagerLeaderService.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 484cddb..f6db682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,11 +49,10 @@ public interface HighAvailabilityServices {
 	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
 	 *
 	 * @param jobID The identifier of the job.
-	 * @param defaultAddress address under which the job manager is reachable
 	 * @return
 	 * @throws Exception
 	 */
-	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception;
+	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
 
 	/**
 	 * Gets the leader election service for the cluster's resource manager.
@@ -86,4 +85,8 @@ public interface HighAvailabilityServices {
 	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
 	 */
 	BlobStore createBlobStore() throws IOException;
+
+	// ------------------------------------------------------------------------
+
+	void shutdown() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 1c73c01..107cbd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,21 +18,13 @@
 
 package org.apache.flink.runtime.highavailability;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,35 +33,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * This implementation can be used for testing, and for cluster setups that do not
  * tolerate failures of the master processes (JobManager, ResourceManager).
  * 
- * <p>This implementation has no dependencies on any external services. It returns fix
- * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore
- * in volatile memory.
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
  */
-public class NonHaServices implements HighAvailabilityServices {
+public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
 
 	/** The fix address of the ResourceManager */
 	private final String resourceManagerAddress;
 
-	private final ConcurrentHashMap<JobID, String> jobMastersAddress;
-
 	/**
 	 * Creates a new services class for the fix pre-defined leaders.
 	 * 
 	 * @param resourceManagerAddress    The fix address of the ResourceManager
 	 */
 	public NonHaServices(String resourceManagerAddress) {
+		super();
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
-		this.jobMastersAddress = new ConcurrentHashMap<>(16);
-	}
-
-	/**
-	 * Binds address of a specified job master
-	 *
-	 * @param jobID            JobID for the specified job master
-	 * @param jobMasterAddress address for the specified job master
-	 */
-	public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
-		jobMastersAddress.put(jobID, jobMasterAddress);
 	}
 
 	// ------------------------------------------------------------------------
@@ -82,37 +62,7 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
-		return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0));
-	}
-
-	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
 		return new StandaloneLeaderElectionService();
 	}
-
-	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
-		return new StandaloneLeaderElectionService();
-	}
-
-	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
-		return new StandaloneCheckpointRecoveryFactory();
-	}
-
-	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
-		return new StandaloneSubmittedJobGraphStore();
-	}
-
-	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
-		return new NonHaRegistry();
-	}
-
-	@Override
-	public BlobStore createBlobStore() {
-		return new VoidBlobStore();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 98b2890..be19c60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -86,7 +86,8 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
 
 	// ------------------------------------------------------------------------
-
+	
+	
 	/** The ZooKeeper client to use */
 	private final CuratorFramework client;
 
@@ -169,6 +170,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		client.close();
+	}
+
+	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
new file mode 100644
index 0000000..8c15a52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base for all {@link HighAvailabilityServices} that are not highly available, but are backed
+ * by storage that has no availability guarantees and leader election services that cannot
+ * elect among multiple distributed leader contenders.
+ */
+public abstract class AbstractNonHaServices implements HighAvailabilityServices {
+
+	private final Object lock = new Object();
+
+	private final ExecutorService executor;
+
+	private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
+
+	private final RunningJobsRegistry runningJobsRegistry;
+
+	private boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	public AbstractNonHaServices() {
+		this.executor = Executors.newCachedThreadPool(new ServicesThreadFactory());
+		this.jobManagerLeaderServices = new HashMap<>();
+		this.runningJobsRegistry = new NonHaRegistry();
+	}
+
+	// ------------------------------------------------------------------------
+	//  services
+	// ------------------------------------------------------------------------
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+		checkNotNull(jobID);
+
+		synchronized (lock) {
+			checkNotShutdown();
+			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+			return service.createLeaderRetrievalService();
+		}
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+		checkNotNull(jobID);
+
+		synchronized (lock) {
+			checkNotShutdown();
+			EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+			return service.createLeaderElectionService();
+		}
+	}
+
+	@GuardedBy("lock")
+	private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
+		EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
+		if (service == null) {
+			service = new EmbeddedLeaderService(executor);
+			jobManagerLeaderServices.put(jobID, service);
+		}
+		return service;
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+		checkNotShutdown();
+		return new StandaloneCheckpointRecoveryFactory();
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		checkNotShutdown();
+		return new StandaloneSubmittedJobGraphStore();
+	}
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+		checkNotShutdown();
+		return runningJobsRegistry;
+	}
+
+	@Override
+	public BlobStore createBlobStore() throws IOException {
+		checkNotShutdown();
+		return new VoidBlobStore();
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		synchronized (lock) {
+			if (!shutdown) {
+				shutdown = true;
+
+				// no further calls should be dispatched
+				executor.shutdownNow();
+
+				// stop all job manager leader services
+				for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
+					service.shutdown();
+				}
+				jobManagerLeaderServices.clear();
+			}
+		}
+	}
+
+	private void checkNotShutdown() {
+		checkState(!shutdown, "high availability services are shut down");
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	protected ExecutorService getExecutorService() {
+		return executor;
+	}
+
+	private static final class ServicesThreadFactory implements ThreadFactory {
+
+		private AtomicInteger enumerator = new AtomicInteger();
+
+		@Override
+		public Thread newThread(@Nonnull Runnable r) {
+			Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+			thread.setDaemon(true);
+			return thread;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
new file mode 100644
index 0000000..84ac551
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders and notifies listeners.
+ * 
+ * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+	private final Object lock = new Object();
+
+	private final Executor notificationExecutor;
+
+	private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+	private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+	/** proposed leader, which has been notified of leadership grant, but has not confirmed */
+	private EmbeddedLeaderElectionService currentLeaderProposed;
+
+	/** actual leader that has confirmed leadership and of which listeners have been notified */
+	private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+	/** fencing UID for the current leader (or proposed leader) */
+	private UUID currentLeaderSessionId;
+
+	/** the cached address of the current leader */
+	private String currentLeaderAddress;
+
+	/** flag marking the service as terminated */
+	private boolean shutdown;
+
+	// ------------------------------------------------------------------------
+
+	public EmbeddedLeaderService(ExecutorService notificationsDispatcher) {
+		this.notificationExecutor = checkNotNull(notificationsDispatcher);
+		this.allLeaderContenders = new HashSet<>();
+		this.listeners = new HashSet<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  shutdown and errors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Shuts down this leader election service.
+	 * 
+	 * <p>This method does not perform a clean revocation of the leader status and
+	 * no notification to any leader listeners. It simply notifies all contenders
+	 * and listeners that the service is no longer available.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			shutdownInternally(new Exception("Leader election service is shutting down"));
+		}
+	}
+
+	private void fatalError(Throwable error) {
+		LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+		synchronized (lock) {
+			shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+		}
+	}
+
+	@GuardedBy("lock")
+	private void shutdownInternally(Exception exceptionForHandlers) {
+		assert Thread.holdsLock(lock);
+
+		if (!shutdown) {
+			// clear all leader status
+			currentLeaderProposed = null;
+			currentLeaderConfirmed = null;
+			currentLeaderSessionId = null;
+			currentLeaderAddress = null;
+
+			// fail all registered listeners
+			for (EmbeddedLeaderElectionService service : allLeaderContenders) {
+				service.shutdown(exceptionForHandlers);
+			}
+			allLeaderContenders.clear();
+
+			// fail all registered listeners
+			for (EmbeddedLeaderRetrievalService service : listeners) {
+				service.shutdown(exceptionForHandlers);
+			}
+			listeners.clear();
+
+			shutdown = true;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  creating contenders and listeners
+	// ------------------------------------------------------------------------
+
+	public LeaderElectionService createLeaderElectionService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderElectionService();
+	}
+
+	public LeaderRetrievalService createLeaderRetrievalService() {
+		checkState(!shutdown, "leader election service is shut down");
+		return new EmbeddedLeaderRetrievalService();
+	}
+
+	// ------------------------------------------------------------------------
+	//  adding and removing contenders & listeners
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Callback from leader contenders when they start their service.
+	 */
+	void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader election service is already started");
+
+			try {
+				if (!allLeaderContenders.add(service)) {
+					throw new IllegalStateException("leader election service was added to this service multiple times");
+				}
+
+				service.contender = contender;
+				service.running = true;
+
+				updateLeader();
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	/**
+	 * Callback from leader contenders when they stop their service.
+	 */
+	void removeContender(EmbeddedLeaderElectionService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!allLeaderContenders.remove(service)) {
+					throw new IllegalStateException("leader election service does not belong to this service");
+				}
+
+				// stop the service
+				service.contender = null;
+				service.running = false;
+				service.isLeader = false;
+
+				// if that was the current leader, unset its status
+				if (currentLeaderConfirmed == service) {
+					currentLeaderConfirmed = null;
+					currentLeaderSessionId = null;
+					currentLeaderAddress = null;
+				}
+				if (currentLeaderProposed == service) {
+					currentLeaderProposed = null;
+					currentLeaderSessionId = null;
+				}
+
+				updateLeader();
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	/**
+	 * Callback from leader contenders when they confirm a leader grant
+	 */
+	void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
+		synchronized (lock) {
+			// if the service was shut down in the meantime, ignore this confirmation
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				// check if the confirmation is for the same grant, or whether it is a stale grant 
+				if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
+					final String address = service.contender.getAddress();
+					LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+
+					// mark leadership
+					currentLeaderConfirmed = service;
+					currentLeaderAddress = address;
+					currentLeaderProposed = null;
+					service.isLeader = true;
+
+					// notify all listeners
+					for (EmbeddedLeaderRetrievalService listener : listeners) {
+						notificationExecutor.execute(
+								new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+					}
+				}
+				else {
+					LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void updateLeader() {
+		// this must be called under the lock
+		assert Thread.holdsLock(lock);
+
+		if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
+			// we need a new leader
+			if (allLeaderContenders.isEmpty()) {
+				// no new leader available, tell everyone that there is no leader currently
+				for (EmbeddedLeaderRetrievalService listener : listeners) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+				}
+			}
+			else {
+				// propose a leader and ask it
+				final UUID leaderSessionId = UUID.randomUUID();
+				EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
+
+				currentLeaderSessionId = leaderSessionId;
+				currentLeaderProposed = leaderService;
+
+				notificationExecutor.execute(
+						new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+			}
+		}
+	}
+
+	void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+		synchronized (lock) {
+			checkState(!shutdown, "leader election service is shut down");
+			checkState(!service.running, "leader retrieval service is already started");
+
+			try {
+				if (!listeners.add(service)) {
+					throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+				}
+
+				service.listener = listener;
+				service.running = true;
+
+				// if we already have a leader, immediately notify this new listener
+				if (currentLeaderConfirmed != null) {
+					notificationExecutor.execute(
+							new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
+				}
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	void removeListener(EmbeddedLeaderRetrievalService service) {
+		synchronized (lock) {
+			// if the service was not even started, simply do nothing
+			if (!service.running || shutdown) {
+				return;
+			}
+
+			try {
+				if (!listeners.remove(service)) {
+					throw new IllegalStateException("leader retrieval service does not belong to this service");
+				}
+
+				// stop the service
+				service.listener = null;
+				service.running = false;
+			}
+			catch (Throwable t) {
+				fatalError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  election and retrieval service implementations 
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderElectionService implements LeaderElectionService {
+
+		volatile LeaderContender contender;
+
+		volatile boolean isLeader;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderContender contender) throws Exception {
+			checkNotNull(contender);
+			addContender(this, contender);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeContender(this);
+		}
+
+		@Override
+		public void confirmLeaderSessionID(UUID leaderSessionID) {
+			checkNotNull(leaderSessionID);
+			confirmLeader(this, leaderSessionID);
+		}
+
+		@Override
+		public boolean hasLeadership() {
+			return isLeader;
+		}
+
+		void shutdown(Exception cause) {
+			if (running) {
+				running = false;
+				isLeader = false;
+				contender.handleError(cause);
+				contender = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+		volatile LeaderRetrievalListener listener;
+
+		volatile boolean running;
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			checkNotNull(listener);
+			addListener(this, listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			removeListener(this);
+		}
+
+		public void shutdown(Exception cause) {
+			if (running) {
+				running = false;
+				listener.handleError(cause);
+				listener = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  asynchronous notifications
+	// ------------------------------------------------------------------------
+
+	private static class NotifyOfLeaderCall implements Runnable {
+
+		@Nullable
+		private final String address;       // null if leader revoked without new leader
+		@Nullable
+		private final UUID leaderSessionId; // null if leader revoked without new leader
+
+		private final LeaderRetrievalListener listener;
+		private final Logger logger;
+
+		NotifyOfLeaderCall(
+				@Nullable String address,
+				@Nullable UUID leaderSessionId,
+				LeaderRetrievalListener listener,
+				Logger logger) {
+
+			this.address = address;
+			this.leaderSessionId = leaderSessionId;
+			this.listener = checkNotNull(listener);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				listener.notifyLeaderAddress(address, leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class GrantLeadershipCall implements Runnable {
+
+		private final LeaderContender contender;
+		private final UUID leaderSessionId;
+		private final Logger logger;
+
+		GrantLeadershipCall(
+				LeaderContender contender,
+				UUID leaderSessionId,
+				Logger logger) {
+
+			this.contender = checkNotNull(contender);
+			this.leaderSessionId = checkNotNull(leaderSessionId);
+			this.logger = checkNotNull(logger);
+		}
+
+		@Override
+		public void run() {
+			try {
+				contender.grantLeadership(leaderSessionId);
+			}
+			catch (Throwable t) {
+				logger.warn("Error notifying leader listener about new leader", t);
+				contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 38e372d..3e88e8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -154,4 +154,13 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public BlobStore createBlobStore() throws IOException {
 		return new VoidBlobStore();
 	}
+
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void shutdown() throws Exception {
+		// nothing to do, since this should not shut down individual services, but cross service parts
+	}
 }