You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/16 08:43:25 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #13644: [FLINK-19542][k8s]Implement LeaderElectionService and LeaderRetrievalService based on Kubernetes API

xintongsong commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r505312805



##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatcher.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watcher for resources in Kubernetes.
+ */
+public abstract class KubernetesWatcher<T extends HasMetadata, K extends KubernetesResource<T>> implements Watcher<T> {

Review comment:
       Let's rename this class to `AbstractKubernetesWatcher`.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -258,6 +258,13 @@
 			.withDescription("If configured, Flink will add \"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
 				"to the main container of TaskExecutor and set the value to the value of " + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
 
+	public static final ConfigOption<Integer> KUBERNETES_MAX_RETRY_ATTEMPTS =
+		key("kubernetes.client.max-retry-attempts")
+			.intType()
+			.defaultValue(5)
+			.withDescription("Defines the number of Kubernetes resources update operation retries before the client " +
+				"gives up. For example, updating the ConfigMap.");
+

Review comment:
       I think "Kubernetes resources update operation" is a bit too general. E.g., one could argue that creating a new pod is also a "Kubernetes resources update operation".
   
   I would suggest `kubernetes.transactional-operation.max-retries` as the configuration key, and explain what is a transactional operation (a group of operations that are guaranteed atomic) in the description.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMapWatcher.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+
+import java.util.Collections;
+
+/**
+ * Watcher for ConfigMaps in Kubernetes.
+ */
+public class KubernetesConfigMapWatcher extends KubernetesWatcher<ConfigMap, KubernetesConfigMap> {
+
+	public KubernetesConfigMapWatcher(FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> callbackHandler) {
+		super(callbackHandler);
+	}
+
+	@Override
+	public void eventReceived(Action action, ConfigMap configMap) {
+		logger.debug("Received {} event for configMap {}, details: {}",
+			action, configMap.getMetadata().getName(), configMap.getData());
+		switch (action) {
+			case ADDED:
+				callbackHandler.onAdded(Collections.singletonList(new KubernetesConfigMap(configMap)));
+				break;
+			case MODIFIED:
+				callbackHandler.onModified(Collections.singletonList(new KubernetesConfigMap(configMap)));
+				break;
+			case ERROR:
+				callbackHandler.onError(Collections.singletonList(new KubernetesConfigMap(configMap)));
+				break;
+			case DELETED:
+				callbackHandler.onDeleted(Collections.singletonList(new KubernetesConfigMap(configMap)));
+				break;

Review comment:
       Minor: we can deduplicate the codes by generating the singleton list before `switch`.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMapWatcher.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+
+import java.util.Collections;
+
+/**
+ * Watcher for ConfigMaps in Kubernetes.

Review comment:
       ```suggestion
    * Watcher for {@link ConfigMap}s in Kubernetes.
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMap.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represent KubernetesConfigMap resource in kubernetes.

Review comment:
       ```suggestion
    * Represent {@link ConfigMap} resource in kubernetes.
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
 		Map<String, String> labels,
 		WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+	/**
+	 * Create the ConfigMap with specified content. If the ConfigMap already exists, nothing will happen.
+	 *
+	 * @param configMap ConfigMap.
+	 *
+	 * @return Return the ConfigMap create future.
+	 */
+	CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+	/**
+	 * Get the ConfigMap with specified name.
+	 *
+	 * @param name ConfigMap name.
+	 *
+	 * @return Return empty if the ConfigMap does not exist.

Review comment:
       ```suggestion
   	 * @return Return the ConfigMap, or empty if the ConfigMap does not exist.
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
 		Map<String, String> labels,
 		WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+	/**
+	 * Create the ConfigMap with specified content. If the ConfigMap already exists, nothing will happen.
+	 *
+	 * @param configMap ConfigMap.
+	 *
+	 * @return Return the ConfigMap create future.
+	 */
+	CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);

Review comment:
       I would suggest naming this method `createConfigMapIfAbsent`, to make it explicit that nothing will happen if the config map already exist.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +230,68 @@ public KubernetesWatch watchPodsAndDoCallback(
 				.watch(new KubernetesPodsWatcher(podCallbackHandler)));
 	}
 
+	@Override
+	public CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap) {
+		return CompletableFuture.runAsync(
+			() -> {
+				if (!getConfigMap(configMap.getName()).isPresent()) {
+					this.internalClient.configMaps().create(configMap.getInternalResource());
+				}
+			},
+			kubeClientExecutorService);

Review comment:
       Not related to this PR, but I think we can already replace `kubeClientExecutorService` with `AbstractResourceManagerDriver#ioExecutor`.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
 		Map<String, String> labels,
 		WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+	/**
+	 * Create the ConfigMap with specified content. If the ConfigMap already exists, nothing will happen.
+	 *
+	 * @param configMap ConfigMap.
+	 *
+	 * @return Return the ConfigMap create future.
+	 */
+	CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+	/**
+	 * Get the ConfigMap with specified name.
+	 *
+	 * @param name ConfigMap name.
+	 *
+	 * @return Return empty if the ConfigMap does not exist.
+	 */
+	Optional<KubernetesConfigMap> getConfigMap(String name);
+
+	/**
+	 * Update an existing ConfigMap with the data.
+	 *
+	 * @param configMapName ConfigMap to be replaced with. Benefit from <a href=https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+	 *                      resource version</a> and combined with {@link #getConfigMap(String)}, we could perform a get-check-and-update
+	 *                      transactional operation. Since concurrent modification could happen on a same ConfigMap,
+	 *                      the update operation may fail. We need to retry internally. The max retry attempts could be
+	 *                      configured via {@link org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_MAX_RETRY_ATTEMPTS}.
+	 * @param checker       Only the checker return true, the ConfigMap will be updated.
+	 * @param function      The obtained ConfigMap will be applied to this function and get a new one to replace.
+	 *
+	 * @return Return the ConfigMap update future.
+	 */
+	CompletableFuture<Boolean> checkAndUpdateConfigMap(
+		String configMapName,
+		Predicate<KubernetesConfigMap> checker,
+		FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, ?> function);

Review comment:
       I think we can get rid of the argument `checker`, and make `function` returns a `Optional<KubernetesConfigMap>` to indicate whether and how to update the config map. This should simplify the interface and make its contract easy to understand.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
 		Map<String, String> labels,
 		WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+	/**
+	 * Create the ConfigMap with specified content. If the ConfigMap already exists, nothing will happen.
+	 *
+	 * @param configMap ConfigMap.
+	 *
+	 * @return Return the ConfigMap create future.
+	 */
+	CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+	/**
+	 * Get the ConfigMap with specified name.
+	 *
+	 * @param name ConfigMap name.
+	 *
+	 * @return Return empty if the ConfigMap does not exist.
+	 */
+	Optional<KubernetesConfigMap> getConfigMap(String name);
+
+	/**
+	 * Update an existing ConfigMap with the data.
+	 *
+	 * @param configMapName ConfigMap to be replaced with. Benefit from <a href=https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+	 *                      resource version</a> and combined with {@link #getConfigMap(String)}, we could perform a get-check-and-update
+	 *                      transactional operation. Since concurrent modification could happen on a same ConfigMap,
+	 *                      the update operation may fail. We need to retry internally. The max retry attempts could be
+	 *                      configured via {@link org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_MAX_RETRY_ATTEMPTS}.
+	 * @param checker       Only the checker return true, the ConfigMap will be updated.
+	 * @param function      The obtained ConfigMap will be applied to this function and get a new one to replace.
+	 *
+	 * @return Return the ConfigMap update future.
+	 */

Review comment:
       minor: It might be better to explain the contract details before the parameters, and keep the parameter docs brief.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +230,68 @@ public KubernetesWatch watchPodsAndDoCallback(
 				.watch(new KubernetesPodsWatcher(podCallbackHandler)));
 	}
 
+	@Override
+	public CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap) {
+		return CompletableFuture.runAsync(
+			() -> {
+				if (!getConfigMap(configMap.getName()).isPresent()) {
+					this.internalClient.configMaps().create(configMap.getInternalResource());
+				}
+			},
+			kubeClientExecutorService);
+	}
+
+	@Override
+	public Optional<KubernetesConfigMap> getConfigMap(String name) {
+		final ConfigMap configMap = this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+		return configMap == null ? Optional.empty() : Optional.of(new KubernetesConfigMap(configMap));
+	}
+
+	@Override
+	public CompletableFuture<Boolean> checkAndUpdateConfigMap(
+			String configMapName,
+			Predicate<KubernetesConfigMap> checker,
+			FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, ?> function) {
+		return FutureUtils.retry(
+			() -> CompletableFuture.supplyAsync(
+				() -> getConfigMap(configMapName)
+					.map(FunctionUtils.uncheckedFunction(configMap -> {
+						final boolean shouldUpdate = checker.test(configMap);
+						if (!shouldUpdate) {
+							LOG.warn("Trying to update ConfigMap {} to {} without checking pass, ignoring.",
+								configMap.getName(), configMap.getData());
+						} else {
+							this.internalClient.configMaps()
+								.inNamespace(namespace)
+								.createOrReplace(function.apply(configMap).getInternalResource());
+						}
+						return shouldUpdate;
+					}))
+					.orElseThrow(
+						() -> new FlinkRuntimeException("ConfigMap " + configMapName + " not exists.")),
+				kubeClientExecutorService),
+			maxRetryAttempts,
+			kubeClientExecutorService);
+	}

Review comment:
       Combining `checker` and `function` into a function that returns `Optional<KubernetesConfigMap>` should help simplify this implementation.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -50,6 +52,8 @@
 	public static final String LABEL_COMPONENT_KEY = "component";
 	public static final String LABEL_COMPONENT_JOB_MANAGER = "jobmanager";
 	public static final String LABEL_COMPONENT_TASK_MANAGER = "taskmanager";
+	public static final String LABEL_CONFIGMAP_TYPE_KEY = "configmap-type";
+	public static final String LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY = "high-availability";

Review comment:
       This change does not belong to this commit.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -80,4 +84,14 @@
 	public static final String RESTART_POLICY_OF_NEVER = "Never";
 
 	public static final String NATIVE_KUBERNETES_COMMAND = "native-k8s";
+
+	// Constants for Kubernetes high availability
+	public static final String LEADER_ADDRESS_KEY = "address";
+	public static final String LEADER_SESSION_ID_KEY = "sessionId";
+	public static final String CHECKPOINT_COUNTER_KEY = "counter";
+	public static final String RUNNING_JOBS_REGISTRY_KEY_PREFIX = "runningJobsRegistry";
+	public static final String JOB_GRAPH_STORE_KEY_PREFIX = "jobGraph";
+
+	public static final String LOCK_IDENTITY = UUID.randomUUID().toString();
+	public static final String LEADER_ANNOTATION_KEY = "control-plane.alpha.kubernetes.io/leader";

Review comment:
       These changes does not belong to this commit.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
 		Map<String, String> labels,
 		WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+	/**
+	 * Create the ConfigMap with specified content. If the ConfigMap already exists, nothing will happen.
+	 *
+	 * @param configMap ConfigMap.
+	 *
+	 * @return Return the ConfigMap create future.
+	 */
+	CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+	/**
+	 * Get the ConfigMap with specified name.
+	 *
+	 * @param name ConfigMap name.
+	 *
+	 * @return Return empty if the ConfigMap does not exist.
+	 */
+	Optional<KubernetesConfigMap> getConfigMap(String name);
+
+	/**
+	 * Update an existing ConfigMap with the data.
+	 *
+	 * @param configMapName ConfigMap to be replaced with. Benefit from <a href=https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+	 *                      resource version</a> and combined with {@link #getConfigMap(String)}, we could perform a get-check-and-update
+	 *                      transactional operation. Since concurrent modification could happen on a same ConfigMap,
+	 *                      the update operation may fail. We need to retry internally. The max retry attempts could be
+	 *                      configured via {@link org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_MAX_RETRY_ATTEMPTS}.
+	 * @param checker       Only the checker return true, the ConfigMap will be updated.
+	 * @param function      The obtained ConfigMap will be applied to this function and get a new one to replace.
+	 *
+	 * @return Return the ConfigMap update future.
+	 */
+	CompletableFuture<Boolean> checkAndUpdateConfigMap(
+		String configMapName,
+		Predicate<KubernetesConfigMap> checker,
+		FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, ?> function);
+
+	/**
+	 * Watch the ConfigMaps with specified name and do the {@link WatchCallbackHandler}.
+	 *
+	 * @param name name to filter the ConfigMaps to watch
+	 * @param callbackHandler callbackHandler which reacts to ConfigMap events
+	 * @return Return a watch for ConfigMaps. It needs to be closed after use.
+	 */
+	KubernetesWatch watchConfigMapsAndDoCallback(

Review comment:
       ```suggestion
   	KubernetesWatch watchConfigMaps(
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	protected final Object lock = new Object();
+
+	/** The leader contender which applies for leadership. */
+	protected volatile LeaderContender leaderContender;
+
+	private volatile UUID issuedLeaderSessionID;
+
+	protected volatile UUID confirmedLeaderSessionID;
+
+	protected volatile String confirmedLeaderAddress;
+
+	protected volatile boolean running;
+
+	protected AbstractLeaderElectionService() {
+		leaderContender = null;
+
+		issuedLeaderSessionID = null;
+		confirmedLeaderSessionID = null;
+		confirmedLeaderAddress = null;
+
+		running = false;
+	}
+
+	@Override
+	public final void start(LeaderContender contender) throws Exception {
+		Preconditions.checkNotNull(contender, "Contender must not be null.");
+		Preconditions.checkState(leaderContender == null, "Contender was already set.");
+
+		logger.info("Starting LeaderElectionService {}.", this);
+
+		synchronized (lock) {
+			leaderContender = contender;
+			running = true;
+			internalStart(contender);
+		}
+	}
+
+	@Override
+	public final void stop() throws Exception {
+		synchronized (lock) {
+			if (!running) {
+				return;
+			}
+			running = false;
+			clearConfirmedLeaderInformation();
+		}
+
+		logger.info("Stopping LeaderElectionService {}.", this);
+
+		internalStop();
+	}

Review comment:
       I noticed that `internalStart` is called from inside the `synchronized` block, while `internalStop` is called from outside the `synchronized` block. I think this is a bit implicit and might become hard to maintain. Implementations extending this abstract class can easily overlook this difference.
   
   I wonder how does it hurt if we move `internalStop` to inside the `synchronized` block. It might hold the lock for longer while blocking other threads from accessing `synchronized` codes protected by this lock, which should be fine given that the service is stopped anyway. At meantime, we gain better maintainability from it.
   
   WDYT?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -196,6 +224,16 @@ public static String getCommonStartCommand(
 		).collect(Collectors.toList());
 	}
 
+	public static Predicate<KubernetesConfigMap> getLeaderChecker() {
+		return configMap -> {
+			if (configMap.getAnnotations() != null) {
+				final String leader = configMap.getAnnotations().get(LEADER_ANNOTATION_KEY);
+				return leader != null && leader.contains(LOCK_IDENTITY);
+			}
+			return false;
+		};
+	}

Review comment:
       This change is not a common config map operations. It is closely related to leader election. Thus, it does not belong to this commit.

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -103,6 +121,52 @@ public KubernetesWatch watchPodsAndDoCallback(Map<String, String> labels, WatchC
 		return watchPodsAndDoCallbackFunction.apply(labels, podCallbackHandler);
 	}
 
+	@Override
+	public CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap) {
+		configMapStore.putIfAbsent(configMap.getName(), configMap);
+		return CompletableFuture.completedFuture(null);
+	}
+
+	@Override
+	public Optional<KubernetesConfigMap> getConfigMap(String name) {
+		final KubernetesConfigMap configMap = configMapStore.get(name);
+		if (configMap == null) {
+			return Optional.empty();
+		}
+		return Optional.of(new MockKubernetesConfigMap(configMap.getName(), new HashMap<>(configMap.getData())));
+	}
+
+	@Override
+	public CompletableFuture<Boolean> checkAndUpdateConfigMap(
+			String configMapName,
+			Predicate<KubernetesConfigMap> checker,
+			FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, ?> function) {
+		return getConfigMap(configMapName).map(FunctionUtils.uncheckedFunction(
+			configMap -> {
+				final boolean shouldUpdate = checker.test(configMap);
+				if (shouldUpdate) {
+					configMapStore.put(configMap.getName(), function.apply(configMap));
+				}
+				return CompletableFuture.completedFuture(shouldUpdate);
+			}))
+			.orElseThrow(() -> new FlinkRuntimeException("ConfigMap " + configMapName + " not exists."));
+	}

Review comment:
       Not sure about having a `configMapStore` and these implementations in the testing class. I would suggest to have `createConfigMapFunction`, `getConfigMapFunction`, `checkAndUpdateConfigMapFunction` instead.
   
   The current limitation is less flexible. E.g., we cannot simulate situations where config maps are deleted externally.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	protected final Object lock = new Object();
+
+	/** The leader contender which applies for leadership. */
+	protected volatile LeaderContender leaderContender;
+
+	private volatile UUID issuedLeaderSessionID;
+
+	protected volatile UUID confirmedLeaderSessionID;
+
+	protected volatile String confirmedLeaderAddress;
+
+	protected volatile boolean running;
+
+	protected AbstractLeaderElectionService() {
+		leaderContender = null;
+
+		issuedLeaderSessionID = null;
+		confirmedLeaderSessionID = null;
+		confirmedLeaderAddress = null;
+
+		running = false;
+	}
+
+	@Override
+	public final void start(LeaderContender contender) throws Exception {
+		Preconditions.checkNotNull(contender, "Contender must not be null.");
+		Preconditions.checkState(leaderContender == null, "Contender was already set.");
+
+		logger.info("Starting LeaderElectionService {}.", this);
+
+		synchronized (lock) {
+			leaderContender = contender;
+			running = true;
+			internalStart(contender);
+		}
+	}
+
+	@Override
+	public final void stop() throws Exception {
+		synchronized (lock) {
+			if (!running) {
+				return;
+			}
+			running = false;
+			clearConfirmedLeaderInformation();
+		}
+
+		logger.info("Stopping LeaderElectionService {}.", this);
+
+		internalStop();
+	}
+
+	@Override
+	public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
+		if (logger.isDebugEnabled()) {
+			logger.debug(
+				"Confirm leader session ID {} for leader {}.",
+				leaderSessionID,
+				leaderAddress);
+		}
+
+		Preconditions.checkNotNull(leaderSessionID);
+
+		if (checkLeaderLatch()) {

Review comment:
       I think the concept 'leader latch' is from ZooKeeper? Maybe we should abstract this as a common meaningful interface?
   I noticed the only difference between `checkLeadeerLatch` and `hasLeadership` is `leaderSessionId.equals(issuedLeaderSessionID)`. If we adjust the order of the `if`s, would it be possible to get rid of `checkLeaderLatch` and replace it with an abstract `hasLeaderShip`?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")

Review comment:
       ```suggestion
   			key("high-availability.kubernetes.leader-election.lease-duration")
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
+import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.kubernetes.utils.Constants.LOCK_IDENTITY;
+
+/**
+ * Represent Leader Elector in kubernetes.
+ */
+public class KubernetesLeaderElector extends LeaderElector<NamespacedKubernetesClient> {

Review comment:
       IIUC, the lifecycle for each `run` for the elector ends when the leadership is revoked. To join another round of election, we need to trigger `run` again. It would be better to explain these in the JavaDocs.  

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+	private final ExecutorService executorService =
+		Executors.newFixedThreadPool(4, new ExecutorThreadFactory("IO-Executor"));
+	private final Configuration configuration = new Configuration();
+
+	protected static final String CLUSTER_ID = "leader-test-cluster";
+	protected static final String LEADER_URL = "akka.tcp://flink@172.20.1.21:6123/user/rpc/resourcemanager";
+	protected static final long TIMEOUT = 30L * 1000L;
+	protected static final String LEADER_CONFIGMAP_NAME = "k8s-ha-app1-resourcemanager";
+	protected final Map<String, KubernetesConfigMap> configMapStore = new HashMap<>();
+	protected final CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> configMapsAndDoCallbackFuture =
+		new CompletableFuture<>();
+	protected final CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> leaderRetrievalConfigMapCallback =
+		new CompletableFuture<>();

Review comment:
       It seems `configMapStore` and the 2 futures are reused across test cases. There could be stability issues.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(30))
+			.withDescription("Define the lease duration for the Kubernetes leader election in ms. The leader will " +
+				"continuously renew its lease time to indicate its existence. And the followers will do a lease " +
+				"checking against the current time. \"renewTime + leaseDuration > now\" means the leader is alive.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+			key("high-availability.kubernetes.client.renew-deadline")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(15))
+			.withDescription("Defines the deadline when the leader tries to renew the lease in ms. If it could not " +
+				"succeed in the given time, the renew operation will be aborted.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+			key("high-availability.kubernetes.client.retry-period")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(3))

Review comment:
       Would it be too frequent to check the leadership every 3s? Given that the default lease duration is 30s.
   
   Ideally, if a contender checks the leadership and learns the remaining lease during, it does not make sense to check again before the lease can be expired. Maybe it makes sense to decide when to perform the next checking dynamically based on the remaining lease duration.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	protected final Object lock = new Object();
+
+	/** The leader contender which applies for leadership. */
+	protected volatile LeaderContender leaderContender;
+
+	private volatile UUID issuedLeaderSessionID;
+
+	protected volatile UUID confirmedLeaderSessionID;
+
+	protected volatile String confirmedLeaderAddress;
+
+	protected volatile boolean running;
+
+	protected AbstractLeaderElectionService() {
+		leaderContender = null;
+
+		issuedLeaderSessionID = null;
+		confirmedLeaderSessionID = null;
+		confirmedLeaderAddress = null;
+
+		running = false;
+	}
+
+	@Override
+	public final void start(LeaderContender contender) throws Exception {
+		Preconditions.checkNotNull(contender, "Contender must not be null.");
+		Preconditions.checkState(leaderContender == null, "Contender was already set.");
+
+		logger.info("Starting LeaderElectionService {}.", this);
+
+		synchronized (lock) {
+			leaderContender = contender;
+			running = true;
+			internalStart(contender);
+		}
+	}
+
+	@Override
+	public final void stop() throws Exception {
+		synchronized (lock) {
+			if (!running) {
+				return;
+			}
+			running = false;
+			clearConfirmedLeaderInformation();
+		}
+
+		logger.info("Stopping LeaderElectionService {}.", this);
+
+		internalStop();
+	}
+
+	@Override
+	public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
+		if (logger.isDebugEnabled()) {
+			logger.debug(
+				"Confirm leader session ID {} for leader {}.",
+				leaderSessionID,
+				leaderAddress);
+		}
+
+		Preconditions.checkNotNull(leaderSessionID);
+
+		if (checkLeaderLatch()) {
+			// check if this is an old confirmation call
+			synchronized (lock) {
+				if (running) {
+					if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
+						confirmLeaderInformation(leaderSessionID, leaderAddress);
+						writeLeaderInformation();
+					}
+				} else {
+					logger.debug("Ignoring the leader session Id {} confirmation, since the " +
+						"LeaderElectionService has already been stopped.", leaderSessionID);
+				}
+			}
+		} else {
+			logger.warn("The leader session ID {} was confirmed even though the " +
+				"corresponding JobManager was not elected as the leader.", leaderSessionID);
+		}
+	}
+
+	@Override
+	public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+		return checkLeaderLatch() && leaderSessionId.equals(issuedLeaderSessionID);
+	}
+
+	/**
+	 * Returns the current leader session ID or null, if the contender is not the leader.
+	 *
+	 * @return The last leader session ID or null, if the contender is not the leader
+	 */
+	public UUID getLeaderSessionID() {
+		return confirmedLeaderSessionID;
+	}
+
+	protected abstract void internalStart(LeaderContender contender) throws Exception;
+
+	protected abstract void internalStop() throws Exception;
+
+	protected abstract void writeLeaderInformation();
+
+	protected abstract boolean checkLeaderLatch();

Review comment:
       Would be nice to add JavaDocs for these interfaces. They are meant to be implemented by various leader election services.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * High availability service for Kubernetes.
+ */
+public class KubernetesHaServices implements HighAvailabilityServices {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KubernetesHaServices.class);
+
+	private static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
+	private static final String DISPATCHER_NAME = "dispatcher";
+
+	private static final String JOB_MANAGER_NAME = "jobmanager";
+
+	private static final String REST_SERVER_NAME = "restserver";
+
+	private final String leaderSuffix;
+
+	private final String clusterId;
+
+	/** Kubernetes client. */
+	private final FlinkKubeClient kubeClient;
+
+	/** The executor to run Kubernetes operations on. */
+	private final Executor executor;
+
+	/** The runtime configuration. */
+	private final Configuration configuration;
+
+	/** Store for arbitrary blobs. */
+	private final BlobStoreService blobStoreService;
+
+	/** The Kubernetes based running jobs registry. */
+	private final RunningJobsRegistry runningJobsRegistry;

Review comment:
       These are in-common with `ZooKeeperHaService`. Would it make sense to abstract them to an `AbstractHaService`?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +230,68 @@ public KubernetesWatch watchPodsAndDoCallback(
 				.watch(new KubernetesPodsWatcher(podCallbackHandler)));
 	}
 
+	@Override
+	public CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap) {
+		return CompletableFuture.runAsync(
+			() -> {
+				if (!getConfigMap(configMap.getName()).isPresent()) {
+					this.internalClient.configMaps().create(configMap.getInternalResource());
+				}

Review comment:
       The existence check and creation are not guaranteed atomic. What happens if another client creates the config map in between? Does the creation operation fail or overwrites the existing one? If it fails, where is the exception handled?

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
 		this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
 		assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
 	}
+
+	@Test
+	public void testCreateAndDeleteConfigMap() {
+		this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(true));
+		this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(false));
+	}
+
+	@Test
+	public void testCheckAndUpdateConfigMap() throws Exception {
+		this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+
+		final Supplier<Exception> configMapNotExistException = () -> new Exception("ConfigMap not exist");
+		FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, ?> function = c -> {
+			c.getData().put(LEADER_ADDRESS_KEY, LEADER_ADDRESS_NEW);
+			return c;
+		};
+		this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+			configMap -> {
+				assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+				return configMap;
+			}
+		).orElseThrow(configMapNotExistException);
+
+		// Checker not pass
+		this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> false, function).get();
+		this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+			configMap -> {
+				assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+				return configMap;
+			}
+		).orElseThrow(configMapNotExistException);
+
+		// Checker pass
+		this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> true, function).get();
+		this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+			configMap -> {
+				assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS_NEW));
+				return configMap;
+			}
+		).orElseThrow(configMapNotExistException);
+	}
+
+	private KubernetesConfigMap buildHAConfigMap() {
+		final Map<String, String> data = new HashMap<>();
+		data.put(LEADER_ADDRESS_KEY, LEADER_ADDRESS);
+		return new KubernetesConfigMap(new ConfigMapBuilder()
+			.withNewMetadata()
+			.withName(LEADER_CONFIG_MAP_NAME)
+			.withLabels(haLabels)
+			.endMetadata()
+			.withData(data).build());
+	}

Review comment:
       I would suggest to use arbitrary testing config map for testing the client. The config map interfaces should work correctly regardless of whether the config map is for HA or not.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(30))
+			.withDescription("Define the lease duration for the Kubernetes leader election in ms. The leader will " +
+				"continuously renew its lease time to indicate its existence. And the followers will do a lease " +
+				"checking against the current time. \"renewTime + leaseDuration > now\" means the leader is alive.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+			key("high-availability.kubernetes.client.renew-deadline")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(15))

Review comment:
       I think the default 15s timeout does not match the default 30s lease duration. That means after the leader gives up the leadership, there are another 15s before another contender can become leader. During this 15s, there's practically no leader.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory for creating Kubernetes high availability services.
+ */
+public class KubernetesHaServicesFactory implements HighAvailabilityServicesFactory {
+
+	@Override
+	public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception {
+		return new KubernetesHaServices(
+			KubeClientFactory.fromConfiguration(configuration),

Review comment:
       IIUC, this means we create 2 `KubeClient`s in the JobManager process? Would it be a problem?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");

Review comment:
       Any reason to make this suffix configurable?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -106,13 +110,37 @@ public static String getDeploymentName(String clusterId) {
 	 * @return Task manager labels.
 	 */
 	public static Map<String, String> getTaskManagerLabels(String clusterId) {
-		final Map<String, String> labels = new HashMap<>();
-		labels.put(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE);
-		labels.put(Constants.LABEL_APP_KEY, clusterId);
+		final Map<String, String> labels = new HashMap<>(getCommonLabels(clusterId));
 		labels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_TASK_MANAGER);
 		return Collections.unmodifiableMap(labels);
 	}
 
+	/**
+	 * Get the common labels for Flink native clusters. All the Kubernetes resources will be set with these labels.
+	 *
+	 * @param clusterId cluster id
+	 * @return Return common labels map
+	 */
+	public static Map<String, String> getCommonLabels(String clusterId) {
+		Map<String, String> commonLabels = new HashMap<>();
+		commonLabels.put(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE);
+		commonLabels.put(Constants.LABEL_APP_KEY, clusterId);
+
+		return Collections.unmodifiableMap(commonLabels);

Review comment:
       Why returning an unmodifiable map? The returned map is a new instance generated in this method. There should be no other reference to it except for what returned from this method.
   
   Returning a modifiable map would also save us from converting back to a modifiable map in `getTaskManagerLabels` and `getConfigMapLabels`.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -80,4 +84,14 @@
 	public static final String RESTART_POLICY_OF_NEVER = "Never";
 
 	public static final String NATIVE_KUBERNETES_COMMAND = "native-k8s";
+
+	// Constants for Kubernetes high availability
+	public static final String LEADER_ADDRESS_KEY = "address";
+	public static final String LEADER_SESSION_ID_KEY = "sessionId";
+	public static final String CHECKPOINT_COUNTER_KEY = "counter";
+	public static final String RUNNING_JOBS_REGISTRY_KEY_PREFIX = "runningJobsRegistry";
+	public static final String JOB_GRAPH_STORE_KEY_PREFIX = "jobGraph";
+
+	public static final String LOCK_IDENTITY = UUID.randomUUID().toString();

Review comment:
       This is not a typical "constant". It is expected to be different for each process. We'd better move it to somewhere else (e.g., `KubernetesLeaderElectionService`) and init it as a non static field.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
##########
@@ -115,11 +115,7 @@ public String getImage() {
 
 	@Override
 	public Map<String, String> getCommonLabels() {
-		Map<String, String> commonLabels = new HashMap<>();
-		commonLabels.put(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE);
-		commonLabels.put(Constants.LABEL_APP_KEY, getClusterId());
-
-		return Collections.unmodifiableMap(commonLabels);
+		return KubernetesUtils.getCommonLabels(getClusterId());

Review comment:
       Seems this change does not belong to this commit?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	protected final Object lock = new Object();

Review comment:
       It is not introduced by this PR, but it might be better to comment which internal states are protected by this lock.

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
 		this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
 		assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
 	}
+
+	@Test
+	public void testCreateAndDeleteConfigMap() {
+		this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(true));
+		this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(false));
+	}

Review comment:
       Would be better to split this into 2 cases, keeping one purpose for each case.

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
 		this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
 		assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
 	}
+
+	@Test
+	public void testCreateAndDeleteConfigMap() {
+		this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(true));
+		this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(false));
+	}
+
+	@Test
+	public void testCheckAndUpdateConfigMap() throws Exception {
+		this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+
+		final Supplier<Exception> configMapNotExistException = () -> new Exception("ConfigMap not exist");
+		FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, ?> function = c -> {
+			c.getData().put(LEADER_ADDRESS_KEY, LEADER_ADDRESS_NEW);
+			return c;
+		};
+		this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+			configMap -> {
+				assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+				return configMap;
+			}
+		).orElseThrow(configMapNotExistException);
+
+		// Checker not pass
+		this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> false, function).get();
+		this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+			configMap -> {
+				assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+				return configMap;
+			}
+		).orElseThrow(configMapNotExistException);
+
+		// Checker pass
+		this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> true, function).get();
+		this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+			configMap -> {
+				assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS_NEW));
+				return configMap;
+			}
+		).orElseThrow(configMapNotExistException);
+	}

Review comment:
       1. Let's separate this to 2 test cases.
   2. Instead of checking the existence with `orElseThrow`, I think asserting `Optional#isPresent` should provide better readability, for all the 3 occurences.
   ```
   final Optional<ConfigMap> configMapOpt = flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME);
   assertThat(configMapOpt.isPresent(), true);
   assertThat(configMapOpt.get().getData().get(LEADER_ADDRESS_KEY), is(xxx));
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(30))
+			.withDescription("Define the lease duration for the Kubernetes leader election in ms. The leader will " +
+				"continuously renew its lease time to indicate its existence. And the followers will do a lease " +
+				"checking against the current time. \"renewTime + leaseDuration > now\" means the leader is alive.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+			key("high-availability.kubernetes.client.renew-deadline")

Review comment:
       ```suggestion
   			key("high-availability.kubernetes.leader-election.renew-deadline")
   ```

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(30))
+			.withDescription("Define the lease duration for the Kubernetes leader election in ms. The leader will " +
+				"continuously renew its lease time to indicate its existence. And the followers will do a lease " +
+				"checking against the current time. \"renewTime + leaseDuration > now\" means the leader is alive.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+			key("high-availability.kubernetes.client.renew-deadline")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(15))
+			.withDescription("Defines the deadline when the leader tries to renew the lease in ms. If it could not " +
+				"succeed in the given time, the renew operation will be aborted.");

Review comment:
       It's a bit confusing what does "renew operation will be aborted" mean. I think we should explain that a leader will give up its leadership if it cannot successfully renew the lease within this time.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends AbstractLeaderElectionService {
+
+	private final FlinkKubeClient kubeClient;
+
+	private final Executor executor;
+
+	private final String configMapName;
+
+	private final KubernetesLeaderElector leaderElector;
+
+	private KubernetesWatch kubernetesWatch;
+
+	// Labels will be used to clean up the ha related ConfigMaps.
+	private Map<String, String> configMapLabels;
+
+	KubernetesLeaderElectionService(
+			FlinkKubeClient kubeClient,
+			Executor executor,
+			KubernetesLeaderElectionConfiguration leaderConfig) {
+
+		this.kubeClient = checkNotNull(kubeClient, "Kubernetes client should not be null.");
+		this.executor = checkNotNull(executor, "Executor should not be null.");
+		this.configMapName = leaderConfig.getConfigMapName();
+		this.leaderElector = kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+		this.leaderContender = null;
+		this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+			leaderConfig.getClusterId(), LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+	}
+
+	@Override
+	public void internalStart(LeaderContender contender) {
+		CompletableFuture.runAsync(leaderElector::run, executor);
+		kubernetesWatch = kubeClient.watchConfigMapsAndDoCallback(configMapName, new ConfigMapCallbackHandlerImpl());
+	}
+
+	@Override
+	public void internalStop() {
+		if (kubernetesWatch != null) {
+			kubernetesWatch.close();
+		}
+	}
+
+	@Override
+	protected void writeLeaderInformation() {
+		updateConfigMap(configMapName);
+	}
+
+	@Override
+	protected boolean checkLeaderLatch() {
+		return kubeClient.getConfigMap(configMapName)
+			.map(configMap -> KubernetesUtils.getLeaderChecker().test(configMap))
+			.orElse(false);
+	}
+
+	@Override
+	public String toString() {
+		return "KubernetesLeaderElectionService{configMapName='" + configMapName + "'}";
+	}
+
+	private void updateConfigMap(String configMapName) {

Review comment:
       I think the name `updateConfigMap` is not very descriptive. What this method really does is to write its own information to the leader information config map.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends AbstractLeaderElectionService {
+
+	private final FlinkKubeClient kubeClient;
+
+	private final Executor executor;
+
+	private final String configMapName;
+
+	private final KubernetesLeaderElector leaderElector;
+
+	private KubernetesWatch kubernetesWatch;
+
+	// Labels will be used to clean up the ha related ConfigMaps.
+	private Map<String, String> configMapLabels;
+
+	KubernetesLeaderElectionService(
+			FlinkKubeClient kubeClient,
+			Executor executor,
+			KubernetesLeaderElectionConfiguration leaderConfig) {
+
+		this.kubeClient = checkNotNull(kubeClient, "Kubernetes client should not be null.");
+		this.executor = checkNotNull(executor, "Executor should not be null.");
+		this.configMapName = leaderConfig.getConfigMapName();
+		this.leaderElector = kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+		this.leaderContender = null;
+		this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+			leaderConfig.getClusterId(), LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+	}
+
+	@Override
+	public void internalStart(LeaderContender contender) {
+		CompletableFuture.runAsync(leaderElector::run, executor);
+		kubernetesWatch = kubeClient.watchConfigMapsAndDoCallback(configMapName, new ConfigMapCallbackHandlerImpl());
+	}
+
+	@Override
+	public void internalStop() {
+		if (kubernetesWatch != null) {
+			kubernetesWatch.close();
+		}
+	}
+
+	@Override
+	protected void writeLeaderInformation() {
+		updateConfigMap(configMapName);
+	}
+
+	@Override
+	protected boolean checkLeaderLatch() {
+		return kubeClient.getConfigMap(configMapName)
+			.map(configMap -> KubernetesUtils.getLeaderChecker().test(configMap))
+			.orElse(false);
+	}
+
+	@Override
+	public String toString() {
+		return "KubernetesLeaderElectionService{configMapName='" + configMapName + "'}";
+	}
+
+	private void updateConfigMap(String configMapName) {
+		try {
+			kubeClient.checkAndUpdateConfigMap(
+				configMapName,
+				KubernetesUtils.getLeaderChecker(),
+				configMap -> {
+					// Get the updated ConfigMap with new leader information
+					if (confirmedLeaderAddress != null && confirmedLeaderSessionID != null) {
+						configMap.getData().put(LEADER_ADDRESS_KEY, confirmedLeaderAddress);
+						configMap.getData().put(LEADER_SESSION_ID_KEY, confirmedLeaderSessionID.toString());
+					}
+					configMap.getLabels().putAll(configMapLabels);
+					return configMap;
+				}).get();
+		} catch (Exception e) {
+			leaderContender.handleError(new Exception("Could not update ConfigMap " + configMapName, e));
+		}
+	}
+
+	private class LeaderCallbackHandlerImpl extends KubernetesLeaderElector.LeaderCallbackHandler {
+
+		@Override
+		public void isLeader() {
+			onGrantLeadership();
+		}
+
+		@Override
+		public void notLeader() {
+			// Clear the leader information in ConfigMap
+			try {
+				kubeClient.checkAndUpdateConfigMap(
+					configMapName,
+					KubernetesUtils.getLeaderChecker(),
+					configMap -> {
+						configMap.getData().remove(LEADER_ADDRESS_KEY);
+						configMap.getData().remove(LEADER_SESSION_ID_KEY);
+						return configMap;
+					}
+				).get();
+			} catch (Exception e) {
+				leaderContender.handleError(
+					new Exception("Could not remove leader information from ConfigMap " + configMapName, e));
+			}
+			onRevokeLeadership();
+			// Continue to contend the leader
+			CompletableFuture.runAsync(leaderElector::run, executor);
+		}
+	}
+
+	private class ConfigMapCallbackHandlerImpl implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+
+		@Override
+		public void onAdded(List<KubernetesConfigMap> configMaps) {
+			// noop
+		}
+
+		@Override
+		public void onModified(List<KubernetesConfigMap> configMaps) {
+			if (checkLeaderLatch()) {
+				configMaps.forEach(configMap -> {
+					if (isLeaderChanged(configMap)) {
+						// the data field does not correspond to the expected leader information
+						if (logger.isDebugEnabled()) {
+							logger.debug("Correcting leader information in {} by {}.",
+								configMapName, leaderContender.getDescription());
+						}
+						updateConfigMap(configMap.getName());
+					}
+				});
+			}
+		}

Review comment:
       Would it be possible that this method is called before `confirmLeadership`, so that the leader election service writes the leader information before the contender confirms the leadership?

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalService.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionService}.
+ * This implementation of the {@link LeaderRetrievalService} retrieves the current leader which has
+ * been elected by the {@link org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionService}.
+ * The leader address as well as the current leader session ID is retrieved from Kubernetes ConfigMap.
+ */
+class KubernetesLeaderRetrievalService implements LeaderRetrievalService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderRetrievalService.class);
+
+	private final Object lock = new Object();

Review comment:
       Better to explain which states/variables should be guarded by this lock.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * High availability service for Kubernetes.
+ */
+public class KubernetesHaServices implements HighAvailabilityServices {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KubernetesHaServices.class);
+
+	private static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
+	private static final String DISPATCHER_NAME = "dispatcher";
+
+	private static final String JOB_MANAGER_NAME = "jobmanager";
+
+	private static final String REST_SERVER_NAME = "restserver";
+
+	private final String leaderSuffix;
+
+	private final String clusterId;
+
+	/** Kubernetes client. */
+	private final FlinkKubeClient kubeClient;
+
+	/** The executor to run Kubernetes operations on. */
+	private final Executor executor;
+
+	/** The runtime configuration. */
+	private final Configuration configuration;
+
+	/** Store for arbitrary blobs. */
+	private final BlobStoreService blobStoreService;
+
+	/** The Kubernetes based running jobs registry. */
+	private final RunningJobsRegistry runningJobsRegistry;
+
+	KubernetesHaServices(
+			FlinkKubeClient kubeClient,
+			Executor executor,
+			Configuration config,
+			BlobStoreService blobStoreService) {
+
+		this.kubeClient = checkNotNull(kubeClient);
+		this.executor = checkNotNull(executor);
+		this.configuration = checkNotNull(config);
+		this.clusterId = checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));
+		this.blobStoreService = blobStoreService;
+
+		this.leaderSuffix = config.getString(KubernetesHighAvailabilityOptions.HA_KUBERNETES_LEADER_SUFFIX);
+
+		this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
+	}
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+		return createLeaderRetrievalService(RESOURCE_MANAGER_NAME);
+	}
+
+	@Override
+	public LeaderRetrievalService getDispatcherLeaderRetriever() {
+		return createLeaderRetrievalService(DISPATCHER_NAME);
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
+		return createLeaderRetrievalService(getLeaderNameForJobManager(jobID));
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
+		return getJobManagerLeaderRetriever(jobID);
+	}
+
+	@Override
+	public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+		return createLeaderRetrievalService(REST_SERVER_NAME);
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
+		return createLeaderElectionService(RESOURCE_MANAGER_NAME);
+	}
+
+	@Override
+	public LeaderElectionService getDispatcherLeaderElectionService() {
+		return createLeaderElectionService(DISPATCHER_NAME);
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
+		return createLeaderElectionService(getLeaderNameForJobManager(jobID));
+	}
+
+	@Override
+	public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
+		return createLeaderElectionService(REST_SERVER_NAME);
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+		return new StandaloneCheckpointRecoveryFactory();
+	}
+
+	@Override
+	public JobGraphStore getJobGraphStore() {
+		return new StandaloneJobGraphStore();
+	}
+
+	@Override
+	public RunningJobsRegistry getRunningJobsRegistry() {
+		return runningJobsRegistry;
+	}
+
+	@Override
+	public BlobStore createBlobStore() {
+		return blobStoreService;
+	}
+
+	@Override
+	public void close() throws Exception {
+		Throwable exception = null;
+
+		try {
+			blobStoreService.close();
+		} catch (Throwable t) {
+			exception = t;
+		}
+
+		kubeClient.close();
+
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Could not properly close the KubernetesHaServices.");
+		}
+	}
+
+	@Override
+	public void closeAndCleanupAllData() throws Exception {
+		LOG.info("Close and clean up all data for KubernetesHaServices.");
+
+		Throwable exception = null;
+
+		try {
+			blobStoreService.closeAndCleanupAllData();
+		} catch (Throwable t) {
+			exception = t;
+		}
+
+		try {
+			kubeClient.deleteConfigMapsByLabels(
+				KubernetesUtils.getConfigMapLabels(clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
+		} catch (Throwable t) {
+			exception = ExceptionUtils.firstOrSuppressed(t, exception);
+		}
+
+		kubeClient.close();
+
+		if (exception != null) {
+			ExceptionUtils.rethrowException(
+				exception, "Could not properly close and clean up all data of KubernetesHaServices.");
+		}
+		LOG.info("Finished cleaning up the high availability data.");
+	}
+
+	private KubernetesLeaderElectionService createLeaderElectionService(String leaderName) {
+		return new KubernetesLeaderElectionService(
+			kubeClient,
+			executor,
+			KubernetesLeaderElectionConfiguration.fromConfiguration(getLeaderConfigMapName(leaderName), configuration));
+	}
+
+	private KubernetesLeaderRetrievalService createLeaderRetrievalService(String leaderName) {
+		return new KubernetesLeaderRetrievalService(kubeClient, getLeaderConfigMapName(leaderName));
+	}

Review comment:
       I think we can make these two methods common interfaces in `AbstractHaService`, and provide different implementations for `Kubernetes/ZooKeeperHaService`. We can also move the component names to the common base class, and converting them to lock path in `ZooKeeperHaService#createLeaderElection/RetrievalService`. In this way, all the `getXXXLeaderElectionService/Retriever` methods can be moved to the base class and reused.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatcher.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watcher for resources in Kubernetes.
+ */
+public abstract class KubernetesWatcher<T extends HasMetadata, K extends KubernetesResource<T>> implements Watcher<T> {

Review comment:
       It would be better to place changes in this class and `KubernetesPodsWatcher` in a separate commit. It seems to me these changes are re-abstraction of the watchers, not watcher callbacks as described in the commit message.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(30))
+			.withDescription("Define the lease duration for the Kubernetes leader election in ms. The leader will " +
+				"continuously renew its lease time to indicate its existence. And the followers will do a lease " +
+				"checking against the current time. \"renewTime + leaseDuration > now\" means the leader is alive.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+			key("high-availability.kubernetes.client.renew-deadline")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(15))
+			.withDescription("Defines the deadline when the leader tries to renew the lease in ms. If it could not " +
+				"succeed in the given time, the renew operation will be aborted.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+			key("high-availability.kubernetes.client.retry-period")

Review comment:
       ```suggestion
   			key("high-availability.kubernetes.leader-election.retry-period")
   ```

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+	private final ExecutorService executorService =
+		Executors.newFixedThreadPool(4, new ExecutorThreadFactory("IO-Executor"));
+	private final Configuration configuration = new Configuration();
+
+	protected static final String CLUSTER_ID = "leader-test-cluster";
+	protected static final String LEADER_URL = "akka.tcp://flink@172.20.1.21:6123/user/rpc/resourcemanager";
+	protected static final long TIMEOUT = 30L * 1000L;
+	protected static final String LEADER_CONFIGMAP_NAME = "k8s-ha-app1-resourcemanager";
+	protected final Map<String, KubernetesConfigMap> configMapStore = new HashMap<>();
+	protected final CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> configMapsAndDoCallbackFuture =
+		new CompletableFuture<>();
+	protected final CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> leaderRetrievalConfigMapCallback =
+		new CompletableFuture<>();
+
+	@Before
+	public void setup() {
+		configuration.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+	}
+
+	@After
+	public void teardown() {
+		executorService.shutdownNow();
+	}
+
+	protected KubernetesLeaderElectionService createLeaderElectionService(AtomicBoolean leaderController) {
+		final TestingFlinkKubeClient flinkKubeClient = TestingFlinkKubeClient.builder()
+			.setConfigMapStore(configMapStore)
+			.setWatchConfigMapsAndDoCallbackFunction((ignore, handler) -> {
+				configMapsAndDoCallbackFuture.complete(handler);
+				return new TestingFlinkKubeClient.MockKubernetesWatch();
+			})
+			.setLeaderController(leaderController).build();
+		return new KubernetesLeaderElectionService(
+			flinkKubeClient,
+			executorService,
+			KubernetesLeaderElectionConfiguration.fromConfiguration(LEADER_CONFIGMAP_NAME, configuration));
+	}
+
+	protected KubernetesLeaderRetrievalService createLeaderRetrievalService() {
+		final TestingFlinkKubeClient flinkKubeClient = TestingFlinkKubeClient.builder()
+			.setConfigMapStore(configMapStore)
+			.setWatchConfigMapsAndDoCallbackFunction((ignore, handler) -> {
+				leaderRetrievalConfigMapCallback.complete(handler);
+				return new TestingFlinkKubeClient.MockKubernetesWatch();
+			}).build();
+		return new KubernetesLeaderRetrievalService(flinkKubeClient, LEADER_CONFIGMAP_NAME);
+	}
+
+	/**
+	 * Context to leader election and retrieval tests.
+	 */
+	protected class Context {
+		final AtomicBoolean leaderController = new AtomicBoolean(false);
+		final KubernetesLeaderElectionService leaderElectionService = createLeaderElectionService(leaderController);
+		final TestingContender contender = new TestingContender(LEADER_URL, leaderElectionService);
+
+		final KubernetesLeaderRetrievalService leaderRetrievalService = createLeaderRetrievalService();
+		final TestingListener listener = new TestingListener();
+
+		protected final void runTest(RunnableWithException testMethod) throws Exception {
+			leaderElectionService.start(contender);
+			leaderController.set(true);
+			contender.waitForLeader(TIMEOUT);
+			assertThat(contender.isLeader(), is(true));
+			leaderRetrievalService.start(listener);
+			testMethod.run();
+			leaderElectionService.stop();
+			leaderRetrievalService.stop();
+		}

Review comment:
       I would suggest to separate the action granting leadership to the contender apart from this method, for better readability.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader

Review comment:
       I think regex before the examples would help understand the pattern.
   IIUC, `k8s-ha-<cluster-id>-<component-name>-leader`?

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+	private final ExecutorService executorService =
+		Executors.newFixedThreadPool(4, new ExecutorThreadFactory("IO-Executor"));
+	private final Configuration configuration = new Configuration();

Review comment:
       It seems `executorService ` and `configuration` are reused across test cases. There could be stability issues.

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+	private final ExecutorService executorService =
+		Executors.newFixedThreadPool(4, new ExecutorThreadFactory("IO-Executor"));
+	private final Configuration configuration = new Configuration();
+
+	protected static final String CLUSTER_ID = "leader-test-cluster";
+	protected static final String LEADER_URL = "akka.tcp://flink@172.20.1.21:6123/user/rpc/resourcemanager";
+	protected static final long TIMEOUT = 30L * 1000L;
+	protected static final String LEADER_CONFIGMAP_NAME = "k8s-ha-app1-resourcemanager";
+	protected final Map<String, KubernetesConfigMap> configMapStore = new HashMap<>();
+	protected final CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> configMapsAndDoCallbackFuture =
+		new CompletableFuture<>();
+	protected final CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> leaderRetrievalConfigMapCallback =
+		new CompletableFuture<>();
+
+	@Before
+	public void setup() {
+		configuration.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
+	}
+
+	@After
+	public void teardown() {
+		executorService.shutdownNow();
+	}
+
+	protected KubernetesLeaderElectionService createLeaderElectionService(AtomicBoolean leaderController) {
+		final TestingFlinkKubeClient flinkKubeClient = TestingFlinkKubeClient.builder()
+			.setConfigMapStore(configMapStore)
+			.setWatchConfigMapsAndDoCallbackFunction((ignore, handler) -> {
+				configMapsAndDoCallbackFuture.complete(handler);
+				return new TestingFlinkKubeClient.MockKubernetesWatch();
+			})
+			.setLeaderController(leaderController).build();
+		return new KubernetesLeaderElectionService(
+			flinkKubeClient,
+			executorService,
+			KubernetesLeaderElectionConfiguration.fromConfiguration(LEADER_CONFIGMAP_NAME, configuration));
+	}
+
+	protected KubernetesLeaderRetrievalService createLeaderRetrievalService() {
+		final TestingFlinkKubeClient flinkKubeClient = TestingFlinkKubeClient.builder()
+			.setConfigMapStore(configMapStore)
+			.setWatchConfigMapsAndDoCallbackFunction((ignore, handler) -> {
+				leaderRetrievalConfigMapCallback.complete(handler);
+				return new TestingFlinkKubeClient.MockKubernetesWatch();
+			}).build();
+		return new KubernetesLeaderRetrievalService(flinkKubeClient, LEADER_CONFIGMAP_NAME);
+	}
+
+	/**
+	 * Context to leader election and retrieval tests.
+	 */
+	protected class Context {
+		final AtomicBoolean leaderController = new AtomicBoolean(false);
+		final KubernetesLeaderElectionService leaderElectionService = createLeaderElectionService(leaderController);
+		final TestingContender contender = new TestingContender(LEADER_URL, leaderElectionService);
+
+		final KubernetesLeaderRetrievalService leaderRetrievalService = createLeaderRetrievalService();
+		final TestingListener listener = new TestingListener();
+
+		protected final void runTest(RunnableWithException testMethod) throws Exception {
+			leaderElectionService.start(contender);
+			leaderController.set(true);
+			contender.waitForLeader(TIMEOUT);
+			assertThat(contender.isLeader(), is(true));
+			leaderRetrievalService.start(listener);
+			testMethod.run();
+			leaderElectionService.stop();
+			leaderRetrievalService.stop();
+		}

Review comment:
       Or maybe renaming the method to `runTestAndGrantLeadershipToContender`, if this is needed for almost every test case.

##########
File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability settings.
+ * All the HA information relevant for a specific component will be stored in a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+			key("high-availability.kubernetes.leader.suffix")
+			.stringType()
+			.defaultValue("leader")
+			.withDescription("The ConfigMap suffix of the leader which contains the URL to the leader and the " +
+				"current leader session ID. Leader elector will use the same ConfigMap for contending the lock.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+			key("high-availability.kubernetes.client.lease-duration")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(30))
+			.withDescription("Define the lease duration for the Kubernetes leader election in ms. The leader will " +
+				"continuously renew its lease time to indicate its existence. And the followers will do a lease " +
+				"checking against the current time. \"renewTime + leaseDuration > now\" means the leader is alive.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+			key("high-availability.kubernetes.client.renew-deadline")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(15))
+			.withDescription("Defines the deadline when the leader tries to renew the lease in ms. If it could not " +
+				"succeed in the given time, the renew operation will be aborted.");
+
+	@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+	public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+			key("high-availability.kubernetes.client.retry-period")
+			.durationType()
+			.defaultValue(Duration.ofSeconds(3))
+			.withDescription("Defines the pause between consecutive retries in ms. Both the leader and followers use " +
+				"this value for the retry.");

Review comment:
       It's not clear what a retry means. I think we should explain that all contenders periodically try to acquire/renew the leadership if possible, at this interval.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements LeaderElectionService {
+
+	protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+	protected final Object lock = new Object();

Review comment:
       IIUC, read or write all the `volatile` fields should be performed in a `synchronized` block protected by this lock? Then why `internalStart` also guarded by this lock?

##########
File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
 		this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
 		assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
 	}
+
+	@Test
+	public void testCreateAndDeleteConfigMap() {
+		this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(true));
+		this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+		assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(), is(false));
+	}

Review comment:
       I think we should also verify that `createConfigMap` does not overwrite an existing config map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org