You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "dmvk (via GitHub)" <gi...@apache.org> on 2023/04/17 08:37:45 UTC

[GitHub] [flink] dmvk opened a new pull request, #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

dmvk opened a new pull request, #22408:
URL: https://github.com/apache/flink/pull/22408

   This fixes a race condition where HA data might have been accidentally cleaned up due to job transition to the terminal state.
   
   https://issues.apache.org/jira/browse/FLINK-31803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on PR #22408:
URL: https://github.com/apache/flink/pull/22408#issuecomment-1512562966

   > Couldn't the test still fail if the update call is made while a restarted job is still initializing?
   
   We don't make any more updates after spinning up the 2nd cluster. The job restart this PR refers to is caused by race conditions during `closeAsyncWithoutCleaningHighAvailabilityData` which happens after the update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1169765749


##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
     /** Tests that a rescaled job graph will be recovered with the latest parallelism. */
     @Test
     void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
+        final Configuration configuration = new Configuration();
+
+        // Only run the test if AdaptiveScheduler is enabled.
+        assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
         final JobVertex jobVertex = new JobVertex("operator");
         jobVertex.setParallelism(1);
         jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
         final JobID jobId = jobGraph.getJobID();
 
-        final Configuration configuration = new Configuration();
+        // We need to have a restart strategy set, to prevent the job from failing during the first
+        // cluster shutdown when TM disconnects.
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));
 
-        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Review Comment:
   I'd just consider this a test for the adaptive scheduler, and we don't categorically skip those in PRs.
   
   If test times are a concern, well then those should be addressed anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1169765749


##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
     /** Tests that a rescaled job graph will be recovered with the latest parallelism. */
     @Test
     void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
+        final Configuration configuration = new Configuration();
+
+        // Only run the test if AdaptiveScheduler is enabled.
+        assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
         final JobVertex jobVertex = new JobVertex("operator");
         jobVertex.setParallelism(1);
         jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
         final JobID jobId = jobGraph.getJobID();
 
-        final Configuration configuration = new Configuration();
+        // We need to have a restart strategy set, to prevent the job from failing during the first
+        // cluster shutdown when TM disconnects.
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));
 
-        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Review Comment:
   I'd just consider this a test for the adaptive scheduler, and we don't categorically skip those.
   
   If test times are a concern, well then those should be addressed anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1172509230


##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
     /** Tests that a rescaled job graph will be recovered with the latest parallelism. */
     @Test
     void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
+        final Configuration configuration = new Configuration();
+
+        // Only run the test if AdaptiveScheduler is enabled.
+        assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
         final JobVertex jobVertex = new JobVertex("operator");
         jobVertex.setParallelism(1);
         jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
         final JobID jobId = jobGraph.getJobID();
 
-        final Configuration configuration = new Configuration();
+        // We need to have a restart strategy set, to prevent the job from failing during the first
+        // cluster shutdown when TM disconnects.
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));
 
-        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Review Comment:
   > I'd just consider this a test for the adaptive scheduler, and we don't categorically skip those in PRs.
   
   Fixed.
   
   > If test times are a concern, well then those should be addressed anyway.
   
   The proper fix would be to have a real HA setup for testing that doesn't require running Zookeeper, which is out of scope for now 😢 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on PR #22408:
URL: https://github.com/apache/flink/pull/22408#issuecomment-1511537581

   > Which terminal state did the job enter, and why?
   
   FAILED because during TM disconnect, the task failed, and the NoRestartStrategy tore the job down.
   
   >
   
   I think this is prevented by calling `waitUntilJobInitializationFinished` before the update, or am I missing something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1168703853


##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
     /** Tests that a rescaled job graph will be recovered with the latest parallelism. */
     @Test
     void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
+        final Configuration configuration = new Configuration();
+
+        // Only run the test if AdaptiveScheduler is enabled.
+        assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
         final JobVertex jobVertex = new JobVertex("operator");
         jobVertex.setParallelism(1);
         jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
         final JobID jobId = jobGraph.getJobID();
 
-        final Configuration configuration = new Configuration();
+        // We need to have a restart strategy set, to prevent the job from failing during the first
+        // cluster shutdown when TM disconnects.
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));
 
-        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Review Comment:
   I don't understand this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1168855893


##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
     /** Tests that a rescaled job graph will be recovered with the latest parallelism. */
     @Test
     void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
+        final Configuration configuration = new Configuration();
+
+        // Only run the test if AdaptiveScheduler is enabled.
+        assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
         final JobVertex jobVertex = new JobVertex("operator");
         jobVertex.setParallelism(1);
         jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
         final JobID jobId = jobGraph.getJobID();
 
-        final Configuration configuration = new Configuration();
+        // We need to have a restart strategy set, to prevent the job from failing during the first
+        // cluster shutdown when TM disconnects.
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));
 
-        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Review Comment:
   This is meant in combination with:
   
   ```
           assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
   ```
   
   which ensures this is already set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22408:
URL: https://github.com/apache/flink/pull/22408#issuecomment-1510944625

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "41f7437e63f08e359060ff394d84f93579e19783",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "41f7437e63f08e359060ff394d84f93579e19783",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 41f7437e63f08e359060ff394d84f93579e19783 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on PR #22408:
URL: https://github.com/apache/flink/pull/22408#issuecomment-1516237983

   @zentol all comments should be addressed, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk merged pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk merged PR #22408:
URL: https://github.com/apache/flink/pull/22408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "zentol (via GitHub)" <gi...@apache.org>.
zentol commented on PR #22408:
URL: https://github.com/apache/flink/pull/22408#issuecomment-1512769170

   > FAILED because during TM disconnect, the task failed, and the NoRestartStrategy tore the job down.
   
   How does the restart strategy fix that? If the TM disconnects (which I assume happens due to the shutdown), surely the job wont reach a running state again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dmvk commented on a diff in pull request #22408: [FLINK-31803] Harden UpdateJobResourceRequirementsRecoveryITCase.

Posted by "dmvk (via GitHub)" <gi...@apache.org>.
dmvk commented on code in PR #22408:
URL: https://github.com/apache/flink/pull/22408#discussion_r1169590475


##########
flink-tests/src/test/java/org/apache/flink/test/recovery/UpdateJobResourceRequirementsRecoveryITCase.java:
##########
@@ -61,15 +64,25 @@ class UpdateJobResourceRequirementsRecoveryITCase {
     /** Tests that a rescaled job graph will be recovered with the latest parallelism. */
     @Test
     void testRescaledJobGraphsWillBeRecoveredCorrectly(@TempDir Path tmpFolder) throws Exception {
+        final Configuration configuration = new Configuration();
+
+        // Only run the test if AdaptiveScheduler is enabled.
+        assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
+
         final JobVertex jobVertex = new JobVertex("operator");
         jobVertex.setParallelism(1);
         jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
         final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
         final JobID jobId = jobGraph.getJobID();
 
-        final Configuration configuration = new Configuration();
+        // We need to have a restart strategy set, to prevent the job from failing during the first
+        // cluster shutdown when TM disconnects.
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(100));
 
-        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Review Comment:
   The main idea is to skip this test when smoke testing the default scheduler, because it can run for > 10s



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org