You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/05/18 07:12:08 UTC

[flink] branch master updated: [FLINK-20695][ha] Clean ha data for job if globally terminated

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cf6f84  [FLINK-20695][ha] Clean ha data for job if globally terminated
8cf6f84 is described below

commit 8cf6f84f1b99ae19ddd18e38a370d5f47325b6f1
Author: Yi Tang <ss...@gmail.com>
AuthorDate: Mon Apr 12 12:59:40 2021 +0800

    [FLINK-20695][ha] Clean ha data for job if globally terminated
    
    At the moment Flink only cleans up the ha data (e.g. K8s ConfigMaps, or
    Zookeeper nodes) while shutting down the cluster. This is not enough for
    a long running session cluster to which you submit multiple jobs. In
    this commit, we clean up the data for the particular job if it reaches a
    globally terminal state.
    
    This closes #15561.
---
 .../highavailability/KubernetesHaServices.java     |  5 ++
 .../highavailability/KubernetesHaServicesTest.java | 34 ++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java       | 20 ++++++--
 .../highavailability/AbstractHaServices.java       | 16 ++++++
 .../highavailability/HighAvailabilityServices.java |  8 +++
 .../nonha/AbstractNonHaServices.java               |  4 ++
 .../zookeeper/ZooKeeperHaServices.java             | 33 +++++++++++-
 .../dispatcher/DispatcherResourceCleanupTest.java  | 28 ++++++++++
 .../highavailability/AbstractHaServicesTest.java   | 42 +++++++++++++--
 .../TestingHighAvailabilityServices.java           | 12 +++++
 .../TestingManualHighAvailabilityServices.java     |  5 ++
 .../zookeeper/ZooKeeperHaServicesTest.java         | 60 +++++++++++++++++++++-
 12 files changed, 257 insertions(+), 10 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
index f5ff366..b7d35f8 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
@@ -143,6 +143,11 @@ public class KubernetesHaServices extends AbstractHaServices {
     }
 
     @Override
+    public void internalCleanupJobData(JobID jobID) throws Exception {
+        kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get();
+    }
+
+    @Override
     protected String getLeaderNameForResourceManager() {
         return getLeaderName(RESOURCE_MANAGER_NAME);
     }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java
index d6b69c3..cd78491 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.kubernetes.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 
 import org.junit.Test;
@@ -76,4 +79,35 @@ public class KubernetesHaServicesTest extends KubernetesHighAvailabilityTestBase
             }
         };
     }
+
+    @Test
+    public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            final KubernetesHaServices kubernetesHaServices =
+                                    new KubernetesHaServices(
+                                            flinkKubeClient,
+                                            executorService,
+                                            configuration,
+                                            new VoidBlobStore());
+                            JobID jobID = new JobID();
+                            String configMapName =
+                                    kubernetesHaServices.getLeaderNameForJobManager(jobID);
+                            final KubernetesConfigMap configMap =
+                                    new TestingFlinkKubeClient.MockKubernetesConfigMap(
+                                            configMapName);
+                            flinkKubeClient.createConfigMap(configMap);
+                            assertThat(
+                                    flinkKubeClient.getConfigMap(configMapName).isPresent(),
+                                    is(true));
+                            kubernetesHaServices.internalCleanupJobData(jobID);
+                            assertThat(
+                                    flinkKubeClient.getConfigMap(configMapName).isPresent(),
+                                    is(false));
+                        });
+            }
+        };
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 7e2ff22..70bf6aa55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -748,13 +748,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
     private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
         jobManagerMetricGroup.removeJob(jobId);
 
-        boolean cleanupHABlobs = false;
+        boolean jobGraphRemoved = false;
         if (cleanupHA) {
             try {
                 jobGraphWriter.removeJobGraph(jobId);
 
-                // only clean up the HA blobs if we could remove the job from HA storage
-                cleanupHABlobs = true;
+                // only clean up the HA blobs and ha service data for the particular job
+                // if we could remove the job from HA storage
+                jobGraphRemoved = true;
             } catch (Exception e) {
                 log.warn(
                         "Could not properly remove job {} from submitted job graph store.",
@@ -770,6 +771,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
                         jobId,
                         e);
             }
+
+            if (jobGraphRemoved) {
+                try {
+                    highAvailabilityServices.cleanupJobData(jobId);
+                } catch (Exception e) {
+                    log.warn(
+                            "Could not properly clean data for job {} stored by ha services",
+                            jobId,
+                            e);
+                }
+            }
         } else {
             try {
                 jobGraphWriter.releaseJobGraph(jobId);
@@ -781,7 +793,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
             }
         }
 
-        blobServer.cleanupJob(jobId, cleanupHABlobs);
+        blobServer.cleanupJob(jobId, jobGraphRemoved);
     }
 
     /** Terminate all currently running {@link JobManagerRunner}s. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index 9a322a8..4c4dfe6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -205,6 +205,13 @@ public abstract class AbstractHaServices implements HighAvailabilityServices {
         logger.info("Finished cleaning up the high availability data.");
     }
 
+    @Override
+    public void cleanupJobData(JobID jobID) throws Exception {
+        logger.info("Clean up the high availability data for job {}.", jobID);
+        internalCleanupJobData(jobID);
+        logger.info("Finished cleaning up the high availability data for job {}.", jobID);
+    }
+
     /**
      * Create leader election service with specified leaderName.
      *
@@ -261,6 +268,15 @@ public abstract class AbstractHaServices implements HighAvailabilityServices {
     protected abstract void internalCleanup() throws Exception;
 
     /**
+     * Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap) for
+     * the specified Job.
+     *
+     * @param jobID The identifier of the job to cleanup.
+     * @throws Exception when do the cleanup operation on external storage.
+     */
+    protected abstract void internalCleanupJobData(JobID jobID) throws Exception;
+
+    /**
      * Get the leader name for ResourceManager.
      *
      * @return Return the ResourceManager leader name. It is ConfigMap name in Kubernetes or child
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index fdb3fa7..fdf031e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -237,4 +237,12 @@ public interface HighAvailabilityServices extends ClientHighAvailabilityServices
      *     up data stored by them.
      */
     void closeAndCleanupAllData() throws Exception;
+
+    /**
+     * Deletes all data for specified job stored by these services in external stores.
+     *
+     * @param jobID The identifier of the job to cleanup.
+     * @throws Exception Thrown, if an exception occurred while cleaning data stored by them.
+     */
+    void cleanupJobData(JobID jobID) throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 188bdbb..dfc53dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.highavailability.nonha;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -111,6 +112,9 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
         close();
     }
 
+    @Override
+    public void cleanupJobData(JobID jobID) throws Exception {}
+
     // ----------------------------------------------------------------------
     // Helper methods
     // ----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index f6ab9e8..802162b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -34,10 +34,14 @@ import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths;
 import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
 
 import javax.annotation.Nonnull;
 
+import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -139,6 +143,22 @@ public class ZooKeeperHaServices extends AbstractHaServices {
     }
 
     @Override
+    public void internalCleanupJobData(JobID jobID) throws Exception {
+        final List<String> paths =
+                Stream.of(
+                                HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH,
+                                HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
+                                HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH,
+                                HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
+                        .map(configuration::getString)
+                        .map(parent -> parent + "/" + jobID)
+                        .collect(Collectors.toList());
+        for (String path : paths) {
+            deleteZNode(path);
+        }
+    }
+
+    @Override
     protected String getLeaderNameForResourceManager() {
         return RESOURCE_MANAGER_LEADER_PATH;
     }
@@ -168,6 +188,10 @@ public class ZooKeeperHaServices extends AbstractHaServices {
     }
 
     private void deleteOwnedZNode() throws Exception {
+        deleteZNode("/");
+    }
+
+    private void deleteZNode(String path) throws Exception {
         // delete the HA_CLUSTER_ID znode which is owned by this cluster
 
         // Since we are using Curator version 2.12 there is a bug in deleting the children
@@ -176,13 +200,18 @@ public class ZooKeeperHaServices extends AbstractHaServices {
         // The retry logic can be removed once we upgrade to Curator version >= 4.0.1.
         boolean zNodeDeleted = false;
         while (!zNodeDeleted) {
+            Stat stat = client.checkExists().forPath(path);
+            if (stat == null) {
+                logger.debug("znode {} has been deleted", path);
+                return;
+            }
             try {
-                client.delete().deletingChildrenIfNeeded().forPath("/");
+                client.delete().deletingChildrenIfNeeded().forPath(path);
                 zNodeDeleted = true;
             } catch (KeeperException.NoNodeException ignored) {
                 // concurrent delete operation. Try again.
                 logger.debug(
-                        "Retrying to delete owned znode because of other concurrent delete operation.");
+                        "Retrying to delete znode because of other concurrent delete operation.");
             }
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index eb04d9d..dd80410 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -131,6 +131,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
     private CompletableFuture<BlobKey> storedHABlobFuture;
     private CompletableFuture<JobID> deleteAllHABlobsFuture;
     private CompletableFuture<JobID> cleanupJobFuture;
+    private CompletableFuture<JobID> cleanupJobHADataFuture;
     private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
 
     @BeforeClass
@@ -151,6 +152,8 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         clearedJobLatch = new OneShotLatch();
         runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
         highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
+        cleanupJobHADataFuture = new CompletableFuture<>();
+        highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture);
 
         storedHABlobFuture = new CompletableFuture<>();
         deleteAllHABlobsFuture = new CompletableFuture<>();
@@ -456,6 +459,31 @@ public class DispatcherResourceCleanupTest extends TestLogger {
         assertThatHABlobsHaveBeenRemoved();
     }
 
+    @Test
+    public void testHaDataCleanupWhenJobFinished() throws Exception {
+        TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
+        TestingJobManagerRunner jobManagerRunner =
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+        finishJob(jobManagerRunner);
+        JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
+        assertThat(jobID, is(this.jobId));
+    }
+
+    @Test
+    public void testHaDataCleanupWhenJobNotFinished() throws Exception {
+        TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
+        TestingJobManagerRunner jobManagerRunner =
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+        jobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId));
+        try {
+            cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
+            fail("We should not delete the HA data for job.");
+        } catch (TimeoutException ignored) {
+            // expected
+        }
+        assertThat(cleanupJobHADataFuture.isDone(), is(false));
+    }
+
     private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
         takeCreatedJobManagerRunner.completeResultFuture(
                 new ExecutionGraphInfo(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index a90dc61..96e6daf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -37,9 +37,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -63,7 +66,8 @@ public class AbstractHaServicesTest extends TestLogger {
                         Executors.directExecutor(),
                         testingBlobStoreService,
                         closeOperations,
-                        () -> closeOperations.offer(CloseOperations.HA_CLEANUP));
+                        () -> closeOperations.offer(CloseOperations.HA_CLEANUP),
+                        ignored -> {});
 
         haServices.closeAndCleanupAllData();
 
@@ -94,7 +98,8 @@ public class AbstractHaServicesTest extends TestLogger {
                         closeOperations,
                         () -> {
                             throw new FlinkException("test exception");
-                        });
+                        },
+                        ignored -> {});
 
         try {
             haServices.closeAndCleanupAllData();
@@ -106,6 +111,29 @@ public class AbstractHaServicesTest extends TestLogger {
         assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE));
     }
 
+    @Test
+    public void testCleanupJobData() throws Exception {
+        final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
+        final TestingBlobStoreService testingBlobStoreService =
+                new TestingBlobStoreService(closeOperations);
+
+        JobID jobID = new JobID();
+        CompletableFuture<JobID> jobCleanupFuture = new CompletableFuture<>();
+
+        final TestingHaServices haServices =
+                new TestingHaServices(
+                        new Configuration(),
+                        Executors.directExecutor(),
+                        testingBlobStoreService,
+                        closeOperations,
+                        () -> {},
+                        jobCleanupFuture::complete);
+
+        haServices.cleanupJobData(jobID);
+        JobID jobIDCleaned = jobCleanupFuture.get();
+        assertThat(jobIDCleaned, is(jobID));
+    }
+
     private enum CloseOperations {
         HA_CLEANUP,
         HA_CLOSE,
@@ -156,16 +184,19 @@ public class AbstractHaServicesTest extends TestLogger {
 
         private final Queue<? super CloseOperations> closeOperations;
         private final RunnableWithException internalCleanupRunnable;
+        private final Consumer<JobID> internalJobCleanupConsumer;
 
         private TestingHaServices(
                 Configuration config,
                 Executor ioExecutor,
                 BlobStoreService blobStoreService,
                 Queue<? super CloseOperations> closeOperations,
-                RunnableWithException internalCleanupRunnable) {
+                RunnableWithException internalCleanupRunnable,
+                Consumer<JobID> internalJobCleanupConsumer) {
             super(config, ioExecutor, blobStoreService);
             this.closeOperations = closeOperations;
             this.internalCleanupRunnable = internalCleanupRunnable;
+            this.internalJobCleanupConsumer = internalJobCleanupConsumer;
         }
 
         @Override
@@ -204,6 +235,11 @@ public class AbstractHaServicesTest extends TestLogger {
         }
 
         @Override
+        protected void internalCleanupJobData(JobID jobID) throws Exception {
+            internalJobCleanupConsumer.accept(jobID);
+        }
+
+        @Override
         protected String getLeaderNameForResourceManager() {
             throw new UnsupportedOperationException("Not supported by this test implementation.");
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 0afc981..4434eef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
@@ -72,6 +73,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
     private CompletableFuture<Void> closeAndCleanupAllDataFuture = new CompletableFuture<>();
 
+    private volatile CompletableFuture<JobID> jobCleanupFuture;
+
     // ------------------------------------------------------------------------
     //  Setters for mock / testing implementations
     // ------------------------------------------------------------------------
@@ -145,6 +148,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
         this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
     }
 
+    public void setCleanupJobDataFuture(CompletableFuture<JobID> jobCleanupFuture) {
+        this.jobCleanupFuture = jobCleanupFuture;
+    }
+
     // ------------------------------------------------------------------------
     //  HA Services Methods
     // ------------------------------------------------------------------------
@@ -277,4 +284,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
     public void closeAndCleanupAllData() throws Exception {
         closeAndCleanupAllDataFuture.complete(null);
     }
+
+    @Override
+    public void cleanupJobData(JobID jobID) {
+        Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID));
+    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
index 6cbb92c..912b48f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java
@@ -136,6 +136,11 @@ public class TestingManualHighAvailabilityServices implements HighAvailabilitySe
         // nothing to do
     }
 
+    @Override
+    public void cleanupJobData(JobID jobID) throws Exception {
+        // nothing to do
+    }
+
     public void grantLeadership(JobID jobId, int index, UUID leaderId) {
         ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
index 81dc6b4..32601b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
@@ -49,9 +49,12 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
@@ -148,6 +151,41 @@ public class ZooKeeperHaServicesTest extends TestLogger {
         assertThat(client.checkExists().forPath(unclePath), is(notNullValue()));
     }
 
+    /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */
+    @Test
+    public void testCleanupJobData() throws Exception {
+        String rootPath = "/foo/bar/flink";
+        final Configuration configuration = createConfiguration(rootPath);
+        String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+
+        final List<String> paths =
+                Stream.of(
+                                HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH,
+                                HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
+                        .map(configuration::getString)
+                        .map(path -> rootPath + namespace + path)
+                        .collect(Collectors.toList());
+
+        final TestingBlobStoreService blobStoreService = new TestingBlobStoreService();
+
+        JobID jobID = new JobID();
+        runCleanupTestWithJob(
+                configuration,
+                blobStoreService,
+                jobID,
+                haServices -> {
+                    for (String path : paths) {
+                        final List<String> children = client.getChildren().forPath(path);
+                        assertThat(children, hasItem(jobID.toString()));
+                    }
+                    haServices.cleanupJobData(jobID);
+                    for (String path : paths) {
+                        final List<String> children = client.getChildren().forPath(path);
+                        assertThat(children, not(hasItem(jobID.toString())));
+                    }
+                });
+    }
+
     private static CuratorFramework startCuratorFramework() {
         return CuratorFrameworkFactory.builder()
                 .connectString(ZOO_KEEPER_RESOURCE.getConnectString())
@@ -170,6 +208,16 @@ public class ZooKeeperHaServicesTest extends TestLogger {
             TestingBlobStoreService blobStoreService,
             ThrowingConsumer<ZooKeeperHaServices, Exception> zooKeeperHaServicesConsumer)
             throws Exception {
+        runCleanupTestWithJob(
+                configuration, blobStoreService, new JobID(), zooKeeperHaServicesConsumer);
+    }
+
+    private void runCleanupTestWithJob(
+            Configuration configuration,
+            TestingBlobStoreService blobStoreService,
+            JobID jobId,
+            ThrowingConsumer<ZooKeeperHaServices, Exception> zooKeeperHaServicesConsumer)
+            throws Exception {
         try (ZooKeeperHaServices zooKeeperHaServices =
                 new ZooKeeperHaServices(
                         ZooKeeperUtils.startCuratorFramework(configuration),
@@ -190,13 +238,23 @@ public class ZooKeeperHaServicesTest extends TestLogger {
             resourceManagerLeaderRetriever.start(listener);
             resourceManagerLeaderElectionService.start(
                     new TestingContender("foobar", resourceManagerLeaderElectionService));
-            final JobID jobId = new JobID();
+            LeaderElectionService jobManagerLeaderElectionService =
+                    zooKeeperHaServices.getJobManagerLeaderElectionService(jobId);
+            jobManagerLeaderElectionService.start(
+                    new TestingContender("", jobManagerLeaderElectionService));
+            LeaderRetrievalService jobManagerLeaderRetriever =
+                    zooKeeperHaServices.getJobManagerLeaderRetriever(jobId);
+            jobManagerLeaderRetriever.start(
+                    new LeaderRetrievalUtils.LeaderConnectionInfoListener());
+
             runningJobsRegistry.setJobRunning(jobId);
 
             listener.getLeaderConnectionInfoFuture().join();
 
             resourceManagerLeaderRetriever.stop();
             resourceManagerLeaderElectionService.stop();
+            jobManagerLeaderRetriever.stop();
+            jobManagerLeaderElectionService.stop();
             runningJobsRegistry.clearJob(jobId);
 
             zooKeeperHaServicesConsumer.accept(zooKeeperHaServices);