You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:28 UTC
[09/52] [abbrv] flink git commit: [FLINK-4835] [cluster management]
Add embedded version of the high-availability services
[FLINK-4835] [cluster management] Add embedded version of the high-availability services
This includes the addition of the EmbeddedLeaderService
and a clean shutdown hook for all high availability services.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9615f15b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9615f15b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9615f15b
Branch: refs/heads/master
Commit: 9615f15beca37d6393f6ea78ec35f712536c8f64
Parents: 208324d
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:57:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../StandaloneCheckpointRecoveryFactory.java | 4 +-
.../highavailability/EmbeddedNonHaServices.java | 62 +++
.../HighAvailabilityServices.java | 7 +-
.../runtime/highavailability/NonHaServices.java | 62 +--
.../highavailability/ZookeeperHaServices.java | 12 +-
.../nonha/AbstractNonHaServices.java | 175 +++++++
.../nonha/EmbeddedLeaderService.java | 466 +++++++++++++++++++
.../TestingHighAvailabilityServices.java | 9 +
8 files changed, 736 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a9624fb..57785ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
throws Exception {
- return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
- .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+ return new StandaloneCompletedCheckpointStore(
+ CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
new file mode 100644
index 0000000..58da287
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedNonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
+
+ private final EmbeddedLeaderService resourceManagerLeaderService;
+
+ public EmbeddedNonHaServices() {
+ super();
+ this.resourceManagerLeaderService = new EmbeddedLeaderService(getExecutorService());
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return resourceManagerLeaderService.createLeaderRetrievalService();
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return resourceManagerLeaderService.createLeaderElectionService();
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ super.shutdown();
+ resourceManagerLeaderService.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 484cddb..f6db682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,11 +49,10 @@ public interface HighAvailabilityServices {
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
- * @param defaultAddress address under which the job manager is reachable
* @return
* @throws Exception
*/
- LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception;
+ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
/**
* Gets the leader election service for the cluster's resource manager.
@@ -86,4 +85,8 @@ public interface HighAvailabilityServices {
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*/
BlobStore createBlobStore() throws IOException;
+
+ // ------------------------------------------------------------------------
+
+ void shutdown() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 1c73c01..107cbd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,21 +18,13 @@
package org.apache.flink.runtime.highavailability;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,35 +33,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* This implementation can be used for testing, and for cluster setups that do not
* tolerate failures of the master processes (JobManager, ResourceManager).
*
- * <p>This implementation has no dependencies on any external services. It returns fix
- * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore
- * in volatile memory.
+ * <p>This implementation has no dependencies on any external services. It returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or
+ * on a local file system and therefore in a storage without guarantees.
*/
-public class NonHaServices implements HighAvailabilityServices {
+public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices {
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;
- private final ConcurrentHashMap<JobID, String> jobMastersAddress;
-
/**
* Creates a new services class for the fix pre-defined leaders.
*
* @param resourceManagerAddress The fix address of the ResourceManager
*/
public NonHaServices(String resourceManagerAddress) {
+ super();
this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
- this.jobMastersAddress = new ConcurrentHashMap<>(16);
- }
-
- /**
- * Binds address of a specified job master
- *
- * @param jobID JobID for the specified job master
- * @param jobMasterAddress address for the specified job master
- */
- public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
- jobMastersAddress.put(jobID, jobMasterAddress);
}
// ------------------------------------------------------------------------
@@ -82,37 +62,7 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
- return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0));
- }
-
- @Override
public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
return new StandaloneLeaderElectionService();
}
-
- @Override
- public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
- return new StandaloneLeaderElectionService();
- }
-
- @Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
- return new StandaloneCheckpointRecoveryFactory();
- }
-
- @Override
- public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
- return new StandaloneSubmittedJobGraphStore();
- }
-
- @Override
- public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
- return new NonHaRegistry();
- }
-
- @Override
- public BlobStore createBlobStore() {
- return new VoidBlobStore();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 98b2890..be19c60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -86,7 +86,8 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
// ------------------------------------------------------------------------
-
+
+
/** The ZooKeeper client to use */
private final CuratorFramework client;
@@ -169,6 +170,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
}
// ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ client.close();
+ }
+
+ // ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
new file mode 100644
index 0000000..8c15a52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base for all {@link HighAvailabilityServices} that are not highly available, but are backed
+ * by storage that has no availability guarantees and leader election services that cannot
+ * elect among multiple distributed leader contenders.
+ */
+public abstract class AbstractNonHaServices implements HighAvailabilityServices {
+
+ private final Object lock = new Object();
+
+ private final ExecutorService executor;
+
+ private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
+
+ private final RunningJobsRegistry runningJobsRegistry;
+
+ private boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ public AbstractNonHaServices() {
+ this.executor = Executors.newCachedThreadPool(new ServicesThreadFactory());
+ this.jobManagerLeaderServices = new HashMap<>();
+ this.runningJobsRegistry = new NonHaRegistry();
+ }
+
+ // ------------------------------------------------------------------------
+ // services
+ // ------------------------------------------------------------------------
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ checkNotNull(jobID);
+
+ synchronized (lock) {
+ checkNotShutdown();
+ EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+ return service.createLeaderRetrievalService();
+ }
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ checkNotNull(jobID);
+
+ synchronized (lock) {
+ checkNotShutdown();
+ EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
+ return service.createLeaderElectionService();
+ }
+ }
+
+ @GuardedBy("lock")
+ private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
+ EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
+ if (service == null) {
+ service = new EmbeddedLeaderService(executor);
+ jobManagerLeaderServices.put(jobID, service);
+ }
+ return service;
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ checkNotShutdown();
+ return new StandaloneCheckpointRecoveryFactory();
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ checkNotShutdown();
+ return new StandaloneSubmittedJobGraphStore();
+ }
+
+ @Override
+ public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ checkNotShutdown();
+ return runningJobsRegistry;
+ }
+
+ @Override
+ public BlobStore createBlobStore() throws IOException {
+ checkNotShutdown();
+ return new VoidBlobStore();
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ synchronized (lock) {
+ if (!shutdown) {
+ shutdown = true;
+
+ // no further calls should be dispatched
+ executor.shutdownNow();
+
+ // stop all job manager leader services
+ for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
+ service.shutdown();
+ }
+ jobManagerLeaderServices.clear();
+ }
+ }
+ }
+
+ private void checkNotShutdown() {
+ checkState(!shutdown, "high availability services are shut down");
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ protected ExecutorService getExecutorService() {
+ return executor;
+ }
+
+ private static final class ServicesThreadFactory implements ThreadFactory {
+
+ private AtomicInteger enumerator = new AtomicInteger();
+
+ @Override
+ public Thread newThread(@Nonnull Runnable r) {
+ Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet());
+ thread.setDaemon(true);
+ return thread;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
new file mode 100644
index 0000000..84ac551
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders and notifies listeners.
+ *
+ * <p>An election service for contenders can be created via {@link #createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+ private final Object lock = new Object();
+
+ private final Executor notificationExecutor;
+
+ private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+ private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+ /** proposed leader, which has been notified of leadership grant, but has not confirmed */
+ private EmbeddedLeaderElectionService currentLeaderProposed;
+
+ /** actual leader that has confirmed leadership and of which listeners have been notified */
+ private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+ /** fencing UID for the current leader (or proposed leader) */
+ private UUID currentLeaderSessionId;
+
+ /** the cached address of the current leader */
+ private String currentLeaderAddress;
+
+ /** flag marking the service as terminated */
+ private boolean shutdown;
+
+ // ------------------------------------------------------------------------
+
+ public EmbeddedLeaderService(ExecutorService notificationsDispatcher) {
+ this.notificationExecutor = checkNotNull(notificationsDispatcher);
+ this.allLeaderContenders = new HashSet<>();
+ this.listeners = new HashSet<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // shutdown and errors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Shuts down this leader election service.
+ *
+ * <p>This method does not perform a clean revocation of the leader status and
+ * no notification to any leader listeners. It simply notifies all contenders
+ * and listeners that the service is no longer available.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ shutdownInternally(new Exception("Leader election service is shutting down"));
+ }
+ }
+
+ private void fatalError(Throwable error) {
+ LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error);
+
+ synchronized (lock) {
+ shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error));
+ }
+ }
+
+ @GuardedBy("lock")
+ private void shutdownInternally(Exception exceptionForHandlers) {
+ assert Thread.holdsLock(lock);
+
+ if (!shutdown) {
+ // clear all leader status
+ currentLeaderProposed = null;
+ currentLeaderConfirmed = null;
+ currentLeaderSessionId = null;
+ currentLeaderAddress = null;
+
+ // fail all registered listeners
+ for (EmbeddedLeaderElectionService service : allLeaderContenders) {
+ service.shutdown(exceptionForHandlers);
+ }
+ allLeaderContenders.clear();
+
+ // fail all registered listeners
+ for (EmbeddedLeaderRetrievalService service : listeners) {
+ service.shutdown(exceptionForHandlers);
+ }
+ listeners.clear();
+
+ shutdown = true;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // creating contenders and listeners
+ // ------------------------------------------------------------------------
+
+ public LeaderElectionService createLeaderElectionService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderElectionService();
+ }
+
+ public LeaderRetrievalService createLeaderRetrievalService() {
+ checkState(!shutdown, "leader election service is shut down");
+ return new EmbeddedLeaderRetrievalService();
+ }
+
+ // ------------------------------------------------------------------------
+ // adding and removing contenders & listeners
+ // ------------------------------------------------------------------------
+
+ /**
+ * Callback from leader contenders when they start their service.
+ */
+ void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader election service is already started");
+
+ try {
+ if (!allLeaderContenders.add(service)) {
+ throw new IllegalStateException("leader election service was added to this service multiple times");
+ }
+
+ service.contender = contender;
+ service.running = true;
+
+ updateLeader();
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ /**
+ * Callback from leader contenders when they stop their service.
+ */
+ void removeContender(EmbeddedLeaderElectionService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!allLeaderContenders.remove(service)) {
+ throw new IllegalStateException("leader election service does not belong to this service");
+ }
+
+ // stop the service
+ service.contender = null;
+ service.running = false;
+ service.isLeader = false;
+
+ // if that was the current leader, unset its status
+ if (currentLeaderConfirmed == service) {
+ currentLeaderConfirmed = null;
+ currentLeaderSessionId = null;
+ currentLeaderAddress = null;
+ }
+ if (currentLeaderProposed == service) {
+ currentLeaderProposed = null;
+ currentLeaderSessionId = null;
+ }
+
+ updateLeader();
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ /**
+ * Callback from leader contenders when they confirm a leader grant
+ */
+ void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
+ synchronized (lock) {
+ // if the service was shut down in the meantime, ignore this confirmation
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ // check if the confirmation is for the same grant, or whether it is a stale grant
+ if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
+ final String address = service.contender.getAddress();
+ LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId);
+
+ // mark leadership
+ currentLeaderConfirmed = service;
+ currentLeaderAddress = address;
+ currentLeaderProposed = null;
+ service.isLeader = true;
+
+ // notify all listeners
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+ }
+ }
+ else {
+ LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void updateLeader() {
+ // this must be called under the lock
+ assert Thread.holdsLock(lock);
+
+ if (currentLeaderConfirmed == null && currentLeaderProposed == null) {
+ // we need a new leader
+ if (allLeaderContenders.isEmpty()) {
+ // no new leader available, tell everyone that there is no leader currently
+ for (EmbeddedLeaderRetrievalService listener : listeners) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(null, null, listener.listener, LOG));
+ }
+ }
+ else {
+ // propose a leader and ask it
+ final UUID leaderSessionId = UUID.randomUUID();
+ EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next();
+
+ currentLeaderSessionId = leaderSessionId;
+ currentLeaderProposed = leaderService;
+
+ notificationExecutor.execute(
+ new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+ }
+ }
+ }
+
+ void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) {
+ synchronized (lock) {
+ checkState(!shutdown, "leader election service is shut down");
+ checkState(!service.running, "leader retrieval service is already started");
+
+ try {
+ if (!listeners.add(service)) {
+ throw new IllegalStateException("leader retrieval service was added to this service multiple times");
+ }
+
+ service.listener = listener;
+ service.running = true;
+
+ // if we already have a leader, immediately notify this new listener
+ if (currentLeaderConfirmed != null) {
+ notificationExecutor.execute(
+ new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG));
+ }
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ void removeListener(EmbeddedLeaderRetrievalService service) {
+ synchronized (lock) {
+ // if the service was not even started, simply do nothing
+ if (!service.running || shutdown) {
+ return;
+ }
+
+ try {
+ if (!listeners.remove(service)) {
+ throw new IllegalStateException("leader retrieval service does not belong to this service");
+ }
+
+ // stop the service
+ service.listener = null;
+ service.running = false;
+ }
+ catch (Throwable t) {
+ fatalError(t);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // election and retrieval service implementations
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderElectionService implements LeaderElectionService {
+
+ volatile LeaderContender contender;
+
+ volatile boolean isLeader;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderContender contender) throws Exception {
+ checkNotNull(contender);
+ addContender(this, contender);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeContender(this);
+ }
+
+ @Override
+ public void confirmLeaderSessionID(UUID leaderSessionID) {
+ checkNotNull(leaderSessionID);
+ confirmLeader(this, leaderSessionID);
+ }
+
+ @Override
+ public boolean hasLeadership() {
+ return isLeader;
+ }
+
+ void shutdown(Exception cause) {
+ if (running) {
+ running = false;
+ isLeader = false;
+ contender.handleError(cause);
+ contender = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService {
+
+ volatile LeaderRetrievalListener listener;
+
+ volatile boolean running;
+
+ @Override
+ public void start(LeaderRetrievalListener listener) throws Exception {
+ checkNotNull(listener);
+ addListener(this, listener);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ removeListener(this);
+ }
+
+ public void shutdown(Exception cause) {
+ if (running) {
+ running = false;
+ listener.handleError(cause);
+ listener = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // asynchronous notifications
+ // ------------------------------------------------------------------------
+
+ private static class NotifyOfLeaderCall implements Runnable {
+
+ @Nullable
+ private final String address; // null if leader revoked without new leader
+ @Nullable
+ private final UUID leaderSessionId; // null if leader revoked without new leader
+
+ private final LeaderRetrievalListener listener;
+ private final Logger logger;
+
+ NotifyOfLeaderCall(
+ @Nullable String address,
+ @Nullable UUID leaderSessionId,
+ LeaderRetrievalListener listener,
+ Logger logger) {
+
+ this.address = address;
+ this.leaderSessionId = leaderSessionId;
+ this.listener = checkNotNull(listener);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ listener.notifyLeaderAddress(address, leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class GrantLeadershipCall implements Runnable {
+
+ private final LeaderContender contender;
+ private final UUID leaderSessionId;
+ private final Logger logger;
+
+ GrantLeadershipCall(
+ LeaderContender contender,
+ UUID leaderSessionId,
+ Logger logger) {
+
+ this.contender = checkNotNull(contender);
+ this.leaderSessionId = checkNotNull(leaderSessionId);
+ this.logger = checkNotNull(logger);
+ }
+
+ @Override
+ public void run() {
+ try {
+ contender.grantLeadership(leaderSessionId);
+ }
+ catch (Throwable t) {
+ logger.warn("Error notifying leader listener about new leader", t);
+ contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 38e372d..3e88e8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -154,4 +154,13 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
public BlobStore createBlobStore() throws IOException {
return new VoidBlobStore();
}
+
+ // ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void shutdown() throws Exception {
+ // nothing to do, since this should not shut down individual services, but cross service parts
+ }
}