You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/19 14:05:31 UTC

[flink] branch release-1.15 updated (70c4ce9 -> 9ed108b)

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 70c4ce9  [hotfix][release] Fix the broken doc config and the script to generate the change
     new 3966744  [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet
     new 9ed108b  [hotfix][docs] Aligns JavaDoc with method signature

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../KubernetesStateHandleStore.java                |  9 ++++++--
 .../KubernetesStateHandleStoreTest.java            | 27 ++++++++++++++++++++++
 .../runtime/jobmanager/DefaultJobGraphStore.java   |  8 +++----
 .../runtime/persistence/StateHandleStore.java      |  6 ++---
 .../jobmanager/DefaultJobGraphStoreTest.java       | 27 ++++++++++++++--------
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 12 ++++++++++
 6 files changed, 70 insertions(+), 19 deletions(-)

[flink] 02/02: [hotfix][docs] Aligns JavaDoc with method signature

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9ed108b55d7092ffac4f3fc66b72f744facfce14
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Mar 16 19:39:28 2022 +0100

    [hotfix][docs] Aligns JavaDoc with method signature
---
 .../java/org/apache/flink/runtime/persistence/StateHandleStore.java    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
index f8e6d69..6c9f521 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
@@ -117,8 +117,7 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers
 
     /**
      * Releases the lock for the given state handle and tries to remove the state handle if it is no
-     * longer locked. It returns the {@link RetrievableStateHandle} stored under the given state
-     * node if any. Also the state on the external storage will be discarded.
+     * longer locked. Also the state on the external storage will be discarded.
      *
      * @param name Key name in ConfigMap or child path name in ZooKeeper
      * @return {@code true} if the state handle is removed (also if it didn't exist in the first

[flink] 01/02: [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3966744a18d9f0e408474969c1f61ea43c8b41d5
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Mar 16 18:09:01 2022 +0100

    [FLINK-26690][runtime] Makes globalCleanupAsync call the removal even if the JobGraph is not put into the JobGraphStore, yet
    
    This can happen if cleanup is triggered after a
    failover of a dirty JobResultStore entry (i.e. of
    a globally-terminated job). In that case, no
    recovery of the JobGraph happens and, therefore, no
    JobGraph is added to the internal addedJobGraphs
    collection.
    
    This required KubernetesStateHandleStore.releaseAndTryRemove
    to work for non-existing state as well. The ZooKeeperStateHandleStore
    implementation is already idempotent in this matter.
    
    ZooKeeperStateHandleStore.releaseAndTryRemove already works like that.
---
 .../KubernetesStateHandleStore.java                |  9 ++++++--
 .../KubernetesStateHandleStoreTest.java            | 27 ++++++++++++++++++++++
 .../runtime/jobmanager/DefaultJobGraphStore.java   |  8 +++----
 .../runtime/persistence/StateHandleStore.java      |  3 ++-
 .../jobmanager/DefaultJobGraphStoreTest.java       | 27 ++++++++++++++--------
 .../zookeeper/ZooKeeperStateHandleStoreTest.java   | 12 ++++++++++
 6 files changed, 69 insertions(+), 17 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index cc7153d..0716b58 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -51,6 +51,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -464,13 +465,14 @@ public class KubernetesStateHandleStore<T extends Serializable>
      * It returns the {@link RetrievableStateHandle} stored under the given state node if any.
      *
      * @param key Key to be removed from ConfigMap
-     * @return True if the state handle is removed successfully
+     * @return True if the state handle isn't listed anymore.
      * @throws Exception if removing the key or discarding the state failed
      */
     @Override
     public boolean releaseAndTryRemove(String key) throws Exception {
         checkNotNull(key, "Key in ConfigMap.");
         final AtomicReference<RetrievableStateHandle<T>> stateHandleRefer = new AtomicReference<>();
+        final AtomicBoolean stateHandleDoesNotExist = new AtomicBoolean(false);
         return updateConfigMap(
                         configMap -> {
                             final String content = configMap.getData().get(key);
@@ -496,6 +498,8 @@ public class KubernetesStateHandleStore<T extends Serializable>
                                     Objects.requireNonNull(configMap.getData().remove(key));
                                 }
                                 return Optional.of(configMap);
+                            } else {
+                                stateHandleDoesNotExist.set(true);
                             }
                             return Optional.empty();
                         })
@@ -516,7 +520,8 @@ public class KubernetesStateHandleStore<T extends Serializable>
                                     throw new CompletionException(e);
                                 }
                             }
-                            return CompletableFuture.completedFuture(updated);
+                            return CompletableFuture.completedFuture(
+                                    stateHandleDoesNotExist.get() || updated);
                         })
                 .get();
     }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
index 9896d49..51294f4 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStoreTest.java
@@ -804,6 +804,33 @@ public class KubernetesStateHandleStoreTest extends KubernetesHighAvailabilityTe
     }
 
     @Test
+    public void testRemoveOfNonExistingState() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesStateHandleStore<
+                                            TestingLongStateHandleHelper.LongStateHandle>
+                                    store =
+                                            new KubernetesStateHandleStore<>(
+                                                    flinkKubeClient,
+                                                    LEADER_CONFIGMAP_NAME,
+                                                    longStateStorage,
+                                                    filter,
+                                                    LOCK_IDENTITY);
+                            assertThat(store.getAllAndLock().size(), is(0));
+                            assertThat(store.releaseAndTryRemove(key), is(true));
+                            assertThat(store.getAllAndLock().size(), is(0));
+
+                            assertThat(TestingLongStateHandleHelper.getGlobalDiscardCount(), is(0));
+                        });
+            }
+        };
+    }
+
+    @Test
     public void testRemoveFailedShouldNotDiscardState() throws Exception {
         new Context() {
             {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
index fda4964..e66a172 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java
@@ -249,12 +249,10 @@ public class DefaultJobGraphStore<R extends ResourceVersion<R>>
                 () -> {
                     LOG.debug("Removing job graph {} from {}.", jobId, jobGraphStateHandleStore);
 
-                    if (addedJobGraphs.contains(jobId)) {
-                        final String name = jobGraphStoreUtil.jobIDToName(jobId);
-                        releaseAndRemoveOrThrowCompletionException(jobId, name);
+                    final String name = jobGraphStoreUtil.jobIDToName(jobId);
+                    releaseAndRemoveOrThrowCompletionException(jobId, name);
 
-                        addedJobGraphs.remove(jobId);
-                    }
+                    addedJobGraphs.remove(jobId);
 
                     LOG.info("Removed job graph {} from {}.", jobId, jobGraphStateHandleStore);
                 },
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
index 5cbfeac..f8e6d69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/persistence/StateHandleStore.java
@@ -121,7 +121,8 @@ public interface StateHandleStore<T extends Serializable, R extends ResourceVers
      * node if any. Also the state on the external storage will be discarded.
      *
      * @param name Key name in ConfigMap or child path name in ZooKeeper
-     * @return True if the state handle could be removed.
+     * @return {@code true} if the state handle is removed (also if it didn't exist in the first
+     *     place); otherwise {@code false}.
      * @throws Exception if releasing, removing the handles or discarding the state failed
      */
     boolean releaseAndTryRemove(String name) throws Exception;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
index d45a39a..1343e7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
@@ -41,8 +41,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /**
@@ -220,14 +221,22 @@ public class DefaultJobGraphStoreTest extends TestLogger {
                 .globalCleanupAsync(testingJobGraph.getJobID(), Executors.directExecutor())
                 .join();
 
-        try {
-            removeFuture.get(timeout, TimeUnit.MILLISECONDS);
-            fail(
-                    "We should get an expected timeout because we are removing a non-existed job graph.");
-        } catch (TimeoutException ex) {
-            // expected
-        }
-        assertThat(removeFuture.isDone(), is(false));
+        assertThat(removeFuture.isDone(), is(true));
+    }
+
+    @Test
+    public void testGlobalCleanupFailsIfRemovalReturnsFalse() throws Exception {
+        final TestingStateHandleStore<JobGraph> stateHandleStore =
+                builder.setRemoveFunction(name -> false).build();
+
+        final JobGraphStore jobGraphStore = createAndStartJobGraphStore(stateHandleStore);
+        assertThrows(
+                ExecutionException.class,
+                () ->
+                        jobGraphStore
+                                .globalCleanupAsync(
+                                        testingJobGraph.getJobID(), Executors.directExecutor())
+                                .get());
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index dc81131..ddf6fc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -224,6 +224,18 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger {
     }
 
     @Test
+    public void testCleanupOfNonExistingState() throws Exception {
+        final ZooKeeperStateHandleStore<TestingLongStateHandleHelper.LongStateHandle> testInstance =
+                new ZooKeeperStateHandleStore<>(
+                        ZOOKEEPER.getClient(), new TestingLongStateHandleHelper());
+
+        final String pathInZooKeeper = "/testCleanupOfNonExistingState";
+
+        assertTrue(testInstance.releaseAndTryRemove(pathInZooKeeper));
+        assertFalse(testInstance.exists(pathInZooKeeper).isExisting());
+    }
+
+    @Test
     public void testRepeatableCleanupWithLockOnNode() throws Exception {
         final CuratorFramework client =
                 ZooKeeperUtils.useNamespaceAndEnsurePath(