You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/20 15:21:01 UTC

[GitHub] [flink] zentol commented on a change in pull request #18416: [FLINK-25715][clients] Add deployment option (`execution.submit-failed-job-on-application-error`) for submitting a failed job when there is an error in application driver.

zentol commented on a change in pull request #18416:
URL: https://github.com/apache/flink/pull/18416#discussion_r788642062



##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -266,10 +271,22 @@ private void runApplicationEntryPoint(
             final Set<JobID> tolerateMissingResult,
             final DispatcherGateway dispatcherGateway,
             final ScheduledExecutor scheduledExecutor,
-            final boolean enforceSingleJobExecution) {
+            final boolean enforceSingleJobExecution,
+            final boolean submitFailedJobOnApplicationError) {
+        if (submitFailedJobOnApplicationError && !enforceSingleJobExecution) {
+            dispatcherGateway.submitFailedJob(
+                    ZERO_JOB_ID,
+                    FAILED_JOB_NAME,
+                    new IllegalStateException(
+                            String.format(
+                                    "Submission of failed job in case of an application error ('%s') is not supported in non-HA setups.",

Review comment:
       I'm confused. Submitting a failed job is not supported (for some reason), yet we are doing exactly that right here?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
##########
@@ -230,6 +234,10 @@ public DispatcherId getFencingToken() {
         private BiFunction<JobID, String, CompletableFuture<String>>
                 stopWithSavepointAndGetLocationFunction;
 
+        public Builder() {

Review comment:
       (package-)private so there's only one way to create the builder?

##########
File path: flink-clients/src/test/java/org/apache/flink/client/testjar/FailingJob.java
##########
@@ -0,0 +1,35 @@
+package org.apache.flink.client.testjar;

Review comment:
       missing license header

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -92,7 +94,7 @@ public void cleanup() {
     @Test
     public void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable {
         final TestingDispatcherGateway.Builder dispatcherBuilder =
-                new TestingDispatcherGateway.Builder()
+                TestingDispatcherGateway.newBuilder()

Review comment:
       can we move these to the previous commit?

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -147,6 +153,57 @@ public void testDispatcherRecoversAfterLosingAndRegainingLeadership() throws Exc
         }
     }
 
+    @Test
+    public void testSubmitFailedJobOnApplicationError() throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final JobID jobId = new JobID();
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
+        configuration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME);
+        configuration.set(ClientOptions.CLIENT_RETRY_PERIOD, Duration.ofMillis(100));
+        configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, false);
+        configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
+        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                createApplicationModeDispatcherResourceManagerComponentFactorySupplier(
+                                        clusterConfiguration.getConfiguration(),
+                                        FailingJob.getProgram()));
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running

Review comment:
       `// wait until job was submitted`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org