You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/16 18:25:12 UTC

[flink] branch master updated (5cb08e4 -> 9b73eba)

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 5cb08e4  [hotfix][ci] Return tools/ci/docs.sh
     new dfdc36c  [FLINK-26652][runtime] Makes the cleanup not fail fatally
     new 0e96823  [hotfix][docs] Updates the default value from the fixed delay strategy
     new 0f870d5  [hotfix][docs] Uses @OverrideDefault instead of noDefaultValue for exponential-delay.attempts
     new 21e16e1  [hotfix][runtime][test] Improves assert message
     new 58d1781  [hotfix][docs] Adds missing JRS configuration parameter in Chinese documentation
     new 9b73eba  [hotfix][tests] Fixed wrong default value in test

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/deployment/config.md          |   4 +
 docs/content.zh/docs/deployment/ha/overview.md     |  20 ++--
 docs/content.zh/docs/deployment/overview.md        |   5 +
 docs/content/docs/deployment/config.md             |   2 +-
 docs/content/docs/deployment/ha/overview.md        |  20 ++--
 docs/content/docs/deployment/overview.md           |   7 +-
 .../generated/cleanup_configuration.html           |   2 +-
 ...ntial_delay_cleanup_strategy_configuration.html |   4 +-
 ...fixed_delay_cleanup_strategy_configuration.html |   6 +-
 .../apache/flink/configuration/CleanupOptions.java |  32 ++++--
 .../flink/runtime/dispatcher/Dispatcher.java       |  19 +++-
 .../cleanup/CleanupRetryStrategyFactory.java       |   4 +-
 .../dispatcher/DispatcherCleanupITCase.java        | 117 ++++++++++-----------
 .../cleanup/CleanupRetryStrategyFactoryTest.java   |  23 ++--
 14 files changed, 157 insertions(+), 108 deletions(-)

[flink] 06/06: [hotfix][tests] Fixed wrong default value in test

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9b73ebad3c18b1487c7dc54d672ffe9aead54e65
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Mar 15 21:22:28 2022 +0100

    [hotfix][tests] Fixed wrong default value in test
---
 .../runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
index c1dc76b..cd3d0a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
@@ -217,7 +217,7 @@ class CleanupRetryStrategyFactoryTest {
 
         testExponentialBackoffDelayRetryStrategyCreation(
                 config,
-                CleanupOptions.CLEANUP_STRATEGY_FIXED_DELAY_DELAY.defaultValue(),
+                CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF.defaultValue(),
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF.defaultValue(),
                 customMaxAttempts);
     }

[flink] 02/06: [hotfix][docs] Updates the default value from the fixed delay strategy

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e968232610861daf1fd9c911996266b9916bd46
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Mar 15 16:14:59 2022 +0100

    [hotfix][docs] Updates the default value from the fixed delay strategy
    
    We want to try infinitely if nothing is specified.
---
 .../generated/fixed_delay_cleanup_strategy_configuration.html     | 4 ++--
 .../main/java/org/apache/flink/configuration/CleanupOptions.java  | 5 +++--
 .../dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java       | 8 ++++++--
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html b/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
index 6312784..231fc9d 100644
--- a/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
+++ b/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
@@ -10,13 +10,13 @@
     <tbody>
         <tr>
             <td><h5>cleanup-strategy.fixed-delay.attempts</h5></td>
-            <td style="word-wrap: break-word;">1</td>
+            <td style="word-wrap: break-word;">infinite</td>
             <td>Integer</td>
             <td>The number of times that Flink retries the cleanup before giving up if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">fixed-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.</td>
         </tr>
         <tr>
             <td><h5>cleanup-strategy.fixed-delay.delay</h5></td>
-            <td style="word-wrap: break-word;">1 s</td>
+            <td style="word-wrap: break-word;">1 min</td>
             <td>Duration</td>
             <td>Amount of time that Flink waits before re-triggering the cleanup after a failed attempt if the <code class="highlighter-rouge">cleanup-strategy</code> is set to <code class="highlighter-rouge">fixed-delay</code>. It can be specified using the following notation: "1 min", "20 s"</td>
         </tr>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
index 31268ef..7c6ff64 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
@@ -131,10 +131,11 @@ public class CleanupOptions {
                                                     + "retry strategy with the given default values.")
                                     .build());
 
+    @Documentation.OverrideDefault("infinite")
     public static final ConfigOption<Integer> CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS =
             ConfigOptions.key(createFixedDelayParameterPrefix("attempts"))
                     .intType()
-                    .defaultValue(1)
+                    .defaultValue(Integer.MAX_VALUE)
                     .withDescription(
                             Description.builder()
                                     .text(
@@ -149,7 +150,7 @@ public class CleanupOptions {
     public static final ConfigOption<Duration> CLEANUP_STRATEGY_FIXED_DELAY_DELAY =
             ConfigOptions.key(createFixedDelayParameterPrefix("delay"))
                     .durationType()
-                    .defaultValue(Duration.ofSeconds(1))
+                    .defaultValue(Duration.ofMinutes(1))
                     .withDescription(
                             Description.builder()
                                     .text(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
index ed6ef75..9cf1db6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher.cleanup;
 
 import org.apache.flink.configuration.CleanupOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExponentialBackoffRetryStrategy;
 import org.apache.flink.util.concurrent.FixedRetryStrategy;
 import org.apache.flink.util.concurrent.RetryStrategy;
@@ -115,8 +116,11 @@ class CleanupRetryStrategyFactoryTest {
     public void testFixedDelayStrategyWithCustomMaxAttempts() {
         final Configuration config =
                 createConfigurationWithRetryStrategy(CleanupOptions.FIXED_DELAY_LABEL);
-        final int customMaxAttempts =
-                CleanupOptions.CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS.defaultValue() + 2;
+        final int customMaxAttempts = 1;
+        Preconditions.checkArgument(
+                customMaxAttempts
+                        != CleanupOptions.CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS.defaultValue(),
+                "The custom value should be different from the default value to make it possible that the overwritten value is selected.");
         config.set(CleanupOptions.CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS, customMaxAttempts);
 
         testFixedDelayStrategyCreation(

[flink] 04/06: [hotfix][runtime][test] Improves assert message

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21e16e12cc94ee80f8418ca9d122683bf26e8ccd
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Mar 15 20:12:00 2022 +0100

    [hotfix][runtime][test] Improves assert message
---
 .../org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 60312f1..4b4ce27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -323,7 +323,7 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
                 haServices.getJobGraphStore().getJobIds(),
                 equalTo(Collections.singleton(jobId)));
         assertThat(
-                "The JobResultStore has this job marked as dirty.",
+                "The JobResultStore should have this job marked as dirty.",
                 haServices.getJobResultStore().getDirtyResults().stream()
                         .map(JobResult::getJobId)
                         .collect(Collectors.toSet()),

[flink] 01/06: [FLINK-26652][runtime] Makes the cleanup not fail fatally

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfdc36c9223a922834c3bae403fa983a218b0ad0
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Mar 15 15:59:08 2022 +0100

    [FLINK-26652][runtime] Makes the cleanup not fail fatally
    
    The user explicitly marked the cleanup retry logic to
    terminate after a certain amount of attempts. This should be
    considered as desired behavior and shouldn't make the cluster
    fail fatally.
---
 docs/content.zh/docs/deployment/ha/overview.md     |  20 ++--
 docs/content.zh/docs/deployment/overview.md        |   5 +
 docs/content/docs/deployment/ha/overview.md        |  20 ++--
 docs/content/docs/deployment/overview.md           |   7 +-
 .../generated/cleanup_configuration.html           |   2 +-
 ...ntial_delay_cleanup_strategy_configuration.html |   2 +-
 ...fixed_delay_cleanup_strategy_configuration.html |   2 +-
 .../apache/flink/configuration/CleanupOptions.java |  24 +++--
 .../flink/runtime/dispatcher/Dispatcher.java       |  19 +++-
 .../dispatcher/DispatcherCleanupITCase.java        | 115 ++++++++++-----------
 10 files changed, 126 insertions(+), 90 deletions(-)

diff --git a/docs/content.zh/docs/deployment/ha/overview.md b/docs/content.zh/docs/deployment/ha/overview.md
index c7e516b..0cfcd30 100644
--- a/docs/content.zh/docs/deployment/ha/overview.md
+++ b/docs/content.zh/docs/deployment/ha/overview.md
@@ -76,14 +76,16 @@ Flink 提供了两种高可用服务实现:
 
 ## JobResultStore
 
-In order to preserve a job's scheduling status across failover events and prevent erroneous
-re-execution of globally terminated (i.e. finished, cancelled or failed) jobs, Flink persists
-status of terminated jobs to a filesystem using the JobResultStore.
-The JobResultStore allows job results to outlive a finished job, and can be used by
-Flink components involved in the recovery of a highly-available cluster in order to
-determine whether a job should be subject to recovery.
-
-The JobResultStore has sensible defaults for its behaviour, such as result storage
-location, but these can be [configured]({{< ref "docs/deployment/config#high-availability" >}}).
+The JobResultStore is used to archive the final result of a job that reached a globally-terminal
+state (i.e. finished, cancelled or failed). The data is stored on a file system (see
+[job-result-store.storage-path]({{< ref "docs/deployment/config#job-result-store-storage-path" >}})).
+Entries in this store are marked as dirty as long as the corresponding job wasn't cleaned up properly
+(artifacts are found in the job's subfolder in [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}})).
+
+Dirty entries are subject to cleanup, i.e. the corresponding job is either cleaned up by Flink at
+the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted as
+soon as the cleanup succeeds. Check the JobResultStore configuration parameters under
+[HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) for further
+details on how to adapt the behavior.
 
 {{< top >}}
diff --git a/docs/content.zh/docs/deployment/overview.md b/docs/content.zh/docs/deployment/overview.md
index df6abea..f10dab3 100644
--- a/docs/content.zh/docs/deployment/overview.md
+++ b/docs/content.zh/docs/deployment/overview.md
@@ -158,6 +158,11 @@ Once a job has reached a globally terminal state of either finished, failed or c
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
 [configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state.
+Its artifacts would need to be cleaned up manually (see the
+[High Availability Services / JobResultStore]({{< ref "docs/deployment/ha/overview#jobresultstore" >}})
+section for further details). Restarting the very same job (i.e. using the same
+job ID) will result in the cleanup being restarted without running the job again.
 
 There is currently an issue with the cleanup of CompletedCheckpoints that failed to be deleted
 while subsuming them as part of the usual CompletedCheckpoint management. These artifacts are
diff --git a/docs/content/docs/deployment/ha/overview.md b/docs/content/docs/deployment/ha/overview.md
index 36cdcb4..1939474 100644
--- a/docs/content/docs/deployment/ha/overview.md
+++ b/docs/content/docs/deployment/ha/overview.md
@@ -82,14 +82,16 @@ Once this happens, all the HA data, including the metadata stored in the HA serv
 
 ## JobResultStore
 
-In order to preserve a job's scheduling status across failover events and prevent erroneous
-re-execution of globally terminated (i.e. finished, cancelled or failed) jobs, Flink persists
-status of terminated jobs to a filesystem using the JobResultStore.
-The JobResultStore allows job results to outlive a finished job, and can be used by
-Flink components involved in the recovery of a highly-available cluster in order to
-determine whether a job should be subject to recovery.
-
-The JobResultStore has sensible defaults for its behaviour, such as result storage
-location, but these can be [configured]({{< ref "docs/deployment/config#high-availability" >}}).
+The JobResultStore is used to archive the final result of a job that reached a globally-terminal 
+state (i.e. finished, cancelled or failed). The data is stored on a file system (see 
+[job-result-store.storage-path]({{< ref "docs/deployment/config#job-result-store-storage-path" >}})).
+Entries in this store are marked as dirty as long as the corresponding job wasn't cleaned up properly 
+(artifacts are found in the job's subfolder in [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}})).
+
+Dirty entries are subject to cleanup, i.e. the corresponding job is either cleaned up by Flink at 
+the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted as 
+soon as the cleanup succeeds. Check the JobResultStore configuration parameters under 
+[HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) for further 
+details on how to adapt the behavior.
 
 {{< top >}}
diff --git a/docs/content/docs/deployment/overview.md b/docs/content/docs/deployment/overview.md
index 64bac26..247605e 100644
--- a/docs/content/docs/deployment/overview.md
+++ b/docs/content/docs/deployment/overview.md
@@ -158,7 +158,12 @@ When deploying Flink, there are often multiple options available for each buildi
 Once a job has reached a globally terminal state of either finished, failed or cancelled, the
 external component resources associated with the job are then cleaned up. In the event of a
 failure when cleaning up a resource, Flink will attempt to retry the cleanup. You can
-[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used.
+[configure]({{< ref "docs/deployment/config#retryable-cleanup" >}}) the retry strategy used. 
+Reaching the maximum number of retries without succeeding will leave the job in a dirty state. 
+Its artifacts would need to be cleaned up manually (see the 
+[High Availability Services / JobResultStore]({{< ref "docs/deployment/ha/overview#jobresultstore" >}})
+section for further details). Restarting the very same job (i.e. using the same 
+job ID) will result in the cleanup being restarted without running the job again.
 
 There is currently an issue with the cleanup of CompletedCheckpoints that failed to be deleted 
 while subsuming them as part of the usual CompletedCheckpoint management. These artifacts are 
diff --git a/docs/layouts/shortcodes/generated/cleanup_configuration.html b/docs/layouts/shortcodes/generated/cleanup_configuration.html
index 02e8618..e0663c3 100644
--- a/docs/layouts/shortcodes/generated/cleanup_configuration.html
+++ b/docs/layouts/shortcodes/generated/cleanup_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>cleanup-strategy</h5></td>
             <td style="word-wrap: break-word;">"exponential-delay"</td>
             <td>String</td>
-            <td>Defines the cleanup strategy to use in case of cleanup failures.<br />Accepted values are:<ul><li><code class="highlighter-rouge">none</code>, <code class="highlighter-rouge">disable</code>, <code class="highlighter-rouge">off</code>: Cleanup is only performed once. No retry will be initiated in case of failure.</li><li><code class="highlighter-rouge">fixed-delay</code>, <code class="highlighter-rouge">fixeddelay</code>: Cleanup attempts will be separated by a fixed inter [...]
+            <td>Defines the cleanup strategy to use in case of cleanup failures.<br />Accepted values are:<ul><li><code class="highlighter-rouge">none</code>, <code class="highlighter-rouge">disable</code>, <code class="highlighter-rouge">off</code>: Cleanup is only performed once. No retry will be initiated in case of failure. The job artifacts (and the job's JobResultStore entry) have to be cleaned up manually in case of a failure.</li><li><code class="highlighter-rouge">fixed-delay</c [...]
         </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html b/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
index f301253..ea2933b 100644
--- a/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
+++ b/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>cleanup-strategy.exponential-delay.attempts</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
-            <td>The number of times a failed cleanup is retried if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">exponential-delay</code>. (no value means: infinitely).</td>
+            <td>The number of times a failed cleanup is retried if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">exponential-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually. Setting no value corresponds to unlimited retries.</td>
         </tr>
         <tr>
             <td><h5>cleanup-strategy.exponential-delay.initial-backoff</h5></td>
diff --git a/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html b/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
index 6890b1b..6312784 100644
--- a/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
+++ b/docs/layouts/shortcodes/generated/fixed_delay_cleanup_strategy_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>cleanup-strategy.fixed-delay.attempts</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>The number of times that Flink retries the cleanup before giving up if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">fixed-delay</code>.</td>
+            <td>The number of times that Flink retries the cleanup before giving up if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">fixed-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.</td>
         </tr>
         <tr>
             <td><h5>cleanup-strategy.fixed-delay.delay</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
index 49d5dac..31268ef 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
@@ -93,7 +93,9 @@ public class CleanupOptions {
                                                                             Collectors.joining(
                                                                                     ", "))
                                                             + ": Cleanup is only performed once. No retry "
-                                                            + "will be initiated in case of failure.",
+                                                            + "will be initiated in case of failure. The job "
+                                                            + "artifacts (and the job's JobResultStore entry) have "
+                                                            + "to be cleaned up manually in case of a failure.",
                                                     NONE_PARAM_VALUES.stream()
                                                             .map(TextElement::code)
                                                             .collect(Collectors.toList())
@@ -105,7 +107,9 @@ public class CleanupOptions {
                                                     "%s, %s: Cleanup attempts will be separated by a fixed "
                                                             + "interval up to the point where the cleanup is "
                                                             + "considered successful or a set amount of retries "
-                                                            + "is reached.",
+                                                            + "is reached. Reaching the configured limit means that "
+                                                            + "the job artifacts (and the job's JobResultStore entry) "
+                                                            + "might need to be cleaned up manually.",
                                                     code(FIXED_DELAY_LABEL),
                                                     code(
                                                             extractAlphaNumericCharacters(
@@ -115,7 +119,9 @@ public class CleanupOptions {
                                                             + "triggers the cleanup with an exponentially "
                                                             + "increasing delay up to the point where the "
                                                             + "cleanup succeeded or a set amount of retries "
-                                                            + "is reached.",
+                                                            + "is reached. Reaching the configured limit means that "
+                                                            + "the job artifacts (and the job's JobResultStore entry) "
+                                                            + "might need to be cleaned up manually.",
                                                     code(EXPONENTIAL_DELAY_LABEL),
                                                     code(
                                                             extractAlphaNumericCharacters(
@@ -133,7 +139,10 @@ public class CleanupOptions {
                             Description.builder()
                                     .text(
                                             "The number of times that Flink retries the cleanup "
-                                                    + "before giving up if %s has been set to %s.",
+                                                    + "before giving up if %s has been set to %s. "
+                                                    + "Reaching the configured limit means that "
+                                                    + "the job artifacts (and the job's JobResultStore entry) "
+                                                    + "might need to be cleaned up manually.",
                                             code(CLEANUP_STRATEGY_PARAM), code(FIXED_DELAY_LABEL))
                                     .build());
 
@@ -188,8 +197,11 @@ public class CleanupOptions {
                             Description.builder()
                                     .text(
                                             "The number of times a failed cleanup is retried "
-                                                    + "if %s has been set to %s. (no value means: "
-                                                    + "infinitely).",
+                                                    + "if %s has been set to %s. Reaching the "
+                                                    + "configured limit means that the job artifacts "
+                                                    + "(and the job's JobResultStore entry) "
+                                                    + "might need to be cleaned up manually. "
+                                                    + "Setting no value corresponds to unlimited retries.",
                                             code(CLEANUP_STRATEGY_PARAM),
                                             code(EXPONENTIAL_DELAY_LABEL))
                                     .build());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 393a429..68f387d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -44,6 +45,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.JobResultEntry;
 import org.apache.flink.runtime.highavailability.JobResultStore;
+import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -615,7 +617,22 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher
 
         final CompletableFuture<Void> jobTerminationFuture =
                 cleanupJobStateFuture.thenCompose(
-                        cleanupJobState -> removeJob(jobId, cleanupJobState));
+                        cleanupJobState ->
+                                removeJob(jobId, cleanupJobState)
+                                        .exceptionally(
+                                                throwable -> {
+                                                    log.warn(
+                                                            "The cleanup of job {} failed. The job's artifacts in '{}' and its JobResultStore entry in '{}' needs to be cleaned manually.",
+                                                            jobId,
+                                                            configuration.get(
+                                                                    HighAvailabilityOptions
+                                                                            .HA_STORAGE_PATH),
+                                                            configuration.get(
+                                                                    JobResultStoreOptions
+                                                                            .STORAGE_PATH),
+                                                            throwable);
+                                                    return null;
+                                                }));
 
         FutureUtils.handleUncaughtException(
                 jobTerminationFuture,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 8cfb1ad..60312f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.CleanupOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
@@ -49,7 +50,6 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingJobGraphStore;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TimeUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -63,11 +63,9 @@ import org.junit.Test;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -133,12 +131,23 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         final int numberOfErrors = 5;
         final RuntimeException temporaryError =
                 new RuntimeException("Expected RuntimeException: Unable to remove job graph.");
+        final AtomicInteger failureCount = new AtomicInteger(numberOfErrors);
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        numberOfErrors,
-                        temporaryError,
-                        actualGlobalCleanupCallCount,
-                        successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (ignoredJobId, ignoredExecutor) -> {
+                                    actualGlobalCleanupCallCount.incrementAndGet();
+
+                                    if (failureCount.getAndDecrement() > 0) {
+                                        return FutureUtils.completedExceptionally(temporaryError);
+                                    }
+
+                                    successfulCleanupLatch.trigger();
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
         haServices.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
@@ -249,11 +258,28 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
 
         // Construct job graph store.
         final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
-        final OneShotLatch successfulCleanupLatch = new OneShotLatch();
-        final RuntimeException temporaryError = new RuntimeException("Unable to remove job graph.");
+        final OneShotLatch firstCleanupTriggered = new OneShotLatch();
+        final CompletableFuture<JobID> successfulJobGraphCleanup = new CompletableFuture<>();
         final JobGraphStore jobGraphStore =
-                createAndStartJobGraphStoreWithCleanupFailures(
-                        1, temporaryError, actualGlobalCleanupCallCount, successfulCleanupLatch);
+                TestingJobGraphStore.newBuilder()
+                        .setGlobalCleanupFunction(
+                                (actualJobId, ignoredExecutor) -> {
+                                    final int callCount =
+                                            actualGlobalCleanupCallCount.getAndIncrement();
+                                    firstCleanupTriggered.trigger();
+
+                                    if (callCount < 1) {
+                                        return FutureUtils.completedExceptionally(
+                                                new RuntimeException(
+                                                        "Expected RuntimeException: Unable to remove job graph."));
+                                    }
+
+                                    successfulJobGraphCleanup.complete(actualJobId);
+                                    return FutureUtils.completedVoidFuture();
+                                })
+                        .build();
+
+        jobGraphStore.start(NoOpJobGraphListener.INSTANCE);
         haServices.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
@@ -262,23 +288,10 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService);
 
         // start the dispatcher with no retries on cleanup
-        final CountDownLatch jobGraphRemovalErrorReceived = new CountDownLatch(1);
-        final Dispatcher dispatcher =
-                createTestingDispatcherBuilder()
-                        .setFatalErrorHandler(
-                                throwable -> {
-                                    final Optional<Throwable> maybeError =
-                                            ExceptionUtils.findThrowable(
-                                                    throwable, temporaryError::equals);
-                                    if (maybeError.isPresent()) {
-                                        jobGraphRemovalErrorReceived.countDown();
-                                    } else {
-                                        testingFatalErrorHandlerResource
-                                                .getFatalErrorHandler()
-                                                .onFatalError(throwable);
-                                    }
-                                })
-                        .build();
+        configuration.set(
+                CleanupOptions.CLEANUP_STRATEGY,
+                CleanupOptions.NONE_PARAM_VALUES.iterator().next());
+        final Dispatcher dispatcher = createTestingDispatcherBuilder().build();
         dispatcher.start();
 
         toTerminate.add(dispatcher);
@@ -288,7 +301,7 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
 
         waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
-        jobGraphRemovalErrorReceived.await();
+        firstCleanupTriggered.await();
 
         // Remove job master leadership.
         leaderElectionService.notLeader();
@@ -296,18 +309,25 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
         // This will clear internal state of election service, so a new contender can register.
         leaderElectionService.stop();
 
-        assertThat(successfulCleanupLatch.isTriggered(), CoreMatchers.is(false));
+        assertThat(
+                "The cleanup should have been triggered only once.",
+                actualGlobalCleanupCallCount.get(),
+                equalTo(1));
+        assertThat(
+                "The cleanup should not have reached the successful cleanup code path.",
+                successfulJobGraphCleanup.isDone(),
+                equalTo(false));
 
         assertThat(
                 "The JobGraph is still stored in the JobGraphStore.",
                 haServices.getJobGraphStore().getJobIds(),
-                CoreMatchers.is(Collections.singleton(jobId)));
+                equalTo(Collections.singleton(jobId)));
         assertThat(
                 "The JobResultStore has this job marked as dirty.",
                 haServices.getJobResultStore().getDirtyResults().stream()
                         .map(JobResult::getJobId)
                         .collect(Collectors.toSet()),
-                CoreMatchers.is(Collections.singleton(jobId)));
+                equalTo(Collections.singleton(jobId)));
 
         // Run a second dispatcher, that restores our finished job.
         final Dispatcher secondDispatcher =
@@ -331,38 +351,11 @@ public class DispatcherCleanupITCase extends AbstractDispatcherTest {
                 "The JobResultStore has the job listed as clean.",
                 haServices.getJobResultStore().hasJobResultEntry(jobId));
 
-        // wait for the successful cleanup to be triggered
-        successfulCleanupLatch.await();
+        assertThat(successfulJobGraphCleanup.get(), equalTo(jobId));
 
         assertThat(actualGlobalCleanupCallCount.get(), equalTo(2));
     }
 
-    private JobGraphStore createAndStartJobGraphStoreWithCleanupFailures(
-            int numberOfCleanupFailures,
-            Throwable throwable,
-            AtomicInteger actualCleanupCallCount,
-            OneShotLatch successfulCleanupLatch)
-            throws Exception {
-        final AtomicInteger failureCount = new AtomicInteger(numberOfCleanupFailures);
-        final JobGraphStore jobGraphStore =
-                TestingJobGraphStore.newBuilder()
-                        .setGlobalCleanupFunction(
-                                (ignoredJobId, ignoredExecutor) -> {
-                                    actualCleanupCallCount.incrementAndGet();
-
-                                    if (failureCount.getAndDecrement() > 0) {
-                                        return FutureUtils.completedExceptionally(throwable);
-                                    }
-
-                                    successfulCleanupLatch.trigger();
-                                    return FutureUtils.completedVoidFuture();
-                                })
-                        .build();
-
-        jobGraphStore.start(null);
-        return jobGraphStore;
-    }
-
     private void waitForJobToFinish(
             TestingLeaderElectionService leaderElectionService,
             DispatcherGateway dispatcherGateway,

[flink] 03/06: [hotfix][docs] Uses @OverrideDefault instead of noDefaultValue for exponential-delay.attempts

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f870d574767dad2a6fd428c7cf00509c3d68bd2
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Wed Mar 16 13:19:51 2022 +0100

    [hotfix][docs] Uses @OverrideDefault instead of noDefaultValue for exponential-delay.attempts
---
 .../exponential_delay_cleanup_strategy_configuration.html   |  4 ++--
 .../java/org/apache/flink/configuration/CleanupOptions.java |  7 ++++---
 .../dispatcher/cleanup/CleanupRetryStrategyFactory.java     |  4 +---
 .../dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java | 13 +++++++++----
 4 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html b/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
index ea2933b..23f13e0 100644
--- a/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
+++ b/docs/layouts/shortcodes/generated/exponential_delay_cleanup_strategy_configuration.html
@@ -10,9 +10,9 @@
     <tbody>
         <tr>
             <td><h5>cleanup-strategy.exponential-delay.attempts</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
+            <td style="word-wrap: break-word;">infinite</td>
             <td>Integer</td>
-            <td>The number of times a failed cleanup is retried if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">exponential-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually. Setting no value corresponds to unlimited retries.</td>
+            <td>The number of times a failed cleanup is retried if <code class="highlighter-rouge">cleanup-strategy</code> has been set to <code class="highlighter-rouge">exponential-delay</code>. Reaching the configured limit means that the job artifacts (and the job's JobResultStore entry) might need to be cleaned up manually.</td>
         </tr>
         <tr>
             <td><h5>cleanup-strategy.exponential-delay.initial-backoff</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
index 7c6ff64..1efc4c5 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CleanupOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
+import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.TextElement;
 
@@ -190,10 +191,11 @@ public class CleanupOptions {
                                             code(EXPONENTIAL_DELAY_LABEL))
                                     .build());
 
+    @Documentation.OverrideDefault("infinite")
     public static final ConfigOption<Integer> CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS =
             ConfigOptions.key(createExponentialBackoffParameterPrefix("attempts"))
                     .intType()
-                    .noDefaultValue()
+                    .defaultValue(Integer.MAX_VALUE)
                     .withDescription(
                             Description.builder()
                                     .text(
@@ -201,8 +203,7 @@ public class CleanupOptions {
                                                     + "if %s has been set to %s. Reaching the "
                                                     + "configured limit means that the job artifacts "
                                                     + "(and the job's JobResultStore entry) "
-                                                    + "might need to be cleaned up manually. "
-                                                    + "Setting no value corresponds to unlimited retries.",
+                                                    + "might need to be cleaned up manually.",
                                             code(CLEANUP_STRATEGY_PARAM),
                                             code(EXPONENTIAL_DELAY_LABEL))
                                     .build());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java
index 79557b0..1ec88a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactory.java
@@ -78,9 +78,7 @@ public enum CleanupRetryStrategyFactory {
         final Duration maxDelay =
                 configuration.get(CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF);
         final int maxAttempts =
-                configuration.getInteger(
-                        CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS,
-                        Integer.MAX_VALUE);
+                configuration.get(CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS);
 
         return new ExponentialBackoffRetryStrategy(maxAttempts, minDelay, maxDelay);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
index 9cf1db6..c1dc76b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CleanupRetryStrategyFactoryTest.java
@@ -65,7 +65,7 @@ class CleanupRetryStrategyFactoryTest {
                 new Configuration(),
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF.defaultValue(),
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF.defaultValue(),
-                Integer.MAX_VALUE);
+                CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS.defaultValue());
     }
 
     private static Configuration createConfigurationWithRetryStrategy(String configValue) {
@@ -162,7 +162,7 @@ class CleanupRetryStrategyFactoryTest {
                 config,
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF.defaultValue(),
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF.defaultValue(),
-                Integer.MAX_VALUE);
+                CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS.defaultValue());
     }
 
     @Test
@@ -180,7 +180,7 @@ class CleanupRetryStrategyFactoryTest {
                 config,
                 customMinDelay,
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF.defaultValue(),
-                Integer.MAX_VALUE);
+                CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS.defaultValue());
     }
 
     @Test
@@ -197,7 +197,7 @@ class CleanupRetryStrategyFactoryTest {
                 config,
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF.defaultValue(),
                 customMaxDelay,
-                Integer.MAX_VALUE);
+                CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS.defaultValue());
     }
 
     @Test
@@ -207,6 +207,11 @@ class CleanupRetryStrategyFactoryTest {
         // 13 is the minimum we can use for this test; otherwise, assertMaxDelay would fail due to a
         // Precondition in ExponentialBackoffRetryStrategy
         final int customMaxAttempts = 13;
+        Preconditions.checkArgument(
+                customMaxAttempts
+                        != CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS
+                                .defaultValue(),
+                "The custom value should be different from the default value to make it possible that the overwritten value is selected.");
         config.set(
                 CleanupOptions.CLEANUP_STRATEGY_EXPONENTIAL_DELAY_MAX_ATTEMPTS, customMaxAttempts);
 

[flink] 05/06: [hotfix][docs] Adds missing JRS configuration parameter in Chinese documentation

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58d1781ea78a6d04c3402f50eb0a4818307e20a1
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Tue Mar 15 21:06:32 2022 +0100

    [hotfix][docs] Adds missing JRS configuration parameter in Chinese documentation
---
 docs/content.zh/docs/deployment/config.md | 4 ++++
 docs/content/docs/deployment/config.md    | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index 9348ecc..912ac8b 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -166,6 +166,10 @@ The JobManager ensures consistency during recovery across TaskManagers. For the
 
 {{< generated/common_high_availability_section >}}
 
+**Options for the JobResultStore in high-availability setups**
+
+{{< generated/common_high_availability_jrs_section >}}
+
 **Options for high-availability setups with ZooKeeper**
 
 {{< generated/common_high_availability_zk_section >}}
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index 9a00287..a38e72d 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -167,7 +167,7 @@ The JobManager ensures consistency during recovery across TaskManagers. For the
 
 {{< generated/common_high_availability_section >}}
 
-**Options for the Job Result Store in high-availability setups**
+**Options for the JobResultStore in high-availability setups**
 
 {{< generated/common_high_availability_jrs_section >}}