You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/20 08:20:35 UTC

[GitHub] [flink] kl0u opened a new pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

kl0u opened a new pull request #13699:
URL: https://github.com/apache/flink/pull/13699


   ## What is the purpose of the change
   
   Currently in Application Mode when we shutdown the cluster we always clean up the HA data of a job. This also includes the cases when the job fails due to a transient error. This behaviour leads to the job not being able to restart even if HA is activated. This PR fixes the issue by only cleaning up the HA data when the job has explicitly reported reaching a terminal state (`FAILED`, `SUCCEEDED`, or `CANCELLED`), and not as a reaction to framework failures or other issues.
   
   **NOTE TO REVIEWERS:** Maybe in some cases, we would like to not even shut down the cluster. But I do not think we can find all these cases and filter by specific exceptions so at least now we can go through the restart process without problems.
   
   ## Brief change log
   
   The changes are split in 3 commits. The main part related to the problem is in the `ApplicationDispatcherBootstrap.runApplicationAndShutdownClusterAsync()` but there is also an important part included in the 3rd commit where the `ApplicationDispatcherBootstrap` only takes the `DispatcherGateway`.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as the tests in the `ApplicationDispatcherBootstrapTest` that were modified.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r509260094



##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -233,7 +234,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
 
 		final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);
 
-		assertException(applicationFuture, JobExecutionException.class);
+		assertException(applicationFuture, ApplicationFailedException.class);

Review comment:
       The fact that this worked is actually an indicator that the `ApplicationDispatcherBootstrap` class is following a quite procedural style. Maybe with the change that it now takes a `DispatcherGateway` for which we have easy test implementations one might rethink it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r508486516



##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/exceptions/ApplicationFailureException.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application.exceptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Generic exception to signal the failure of an application.
+ * This is to be subclassed by more specific exceptions.
+ */
+@Internal
+public class ApplicationFailureException extends FlinkException {

Review comment:
       Instead of having multiple subclasses of `ApplicationFailureException` we could have a single `ApplicationExecutionException` which also contains the application status as a field. Then one could do the matching based on the `ApplicationStatus` instead of the different sub classes. This might be a bit more robust.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -155,8 +160,9 @@ public void stop() {
 						}
 
 						LOG.warn("Exiting with Application Status UNKNOWN: ", t);
-						return dispatcher.shutDownCluster(ApplicationStatus.UNKNOWN);
 
+						this.errorHandler.onFatalError(t);

Review comment:
       Maybe add an exception message `new FlinkException("Application failed unexpectedly.", t)` or so.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -73,35 +72,38 @@
  * if it should submit a job for execution (in case of a new job) or the job was already recovered and is running.
  */
 @Internal
-public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap {
+public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);
 
 	public static final JobID ZERO_JOB_ID = new JobID(0, 0);
 
 	private final PackagedProgram application;
 
-	private final Collection<JobGraph> recoveredJobs;
+	private final Collection<JobID> recoveredJobIds;
 
 	private final Configuration configuration;
 
+	private final FatalErrorHandler errorHandler;
+
 	private CompletableFuture<Void> applicationCompletionFuture;
 
 	private ScheduledFuture<?> applicationExecutionTask;
 
 	public ApplicationDispatcherBootstrap(
 			final PackagedProgram application,
-			final Collection<JobGraph> recoveredJobs,
-			final Configuration configuration) {
+			final Collection<JobID> recoveredJobIds,
+			final Configuration configuration,
+			final FatalErrorHandler errorHandler) {
 		this.configuration = checkNotNull(configuration);
-		this.recoveredJobs = checkNotNull(recoveredJobs);
+		this.recoveredJobIds = checkNotNull(recoveredJobIds);
 		this.application = checkNotNull(application);
+		this.errorHandler = checkNotNull(errorHandler);
 	}
 
 	@Override
-	public void initialize(final Dispatcher dispatcher, ScheduledExecutor scheduledExecutor) {
+	public void initialize(final DispatcherGateway dispatcher, ScheduledExecutor scheduledExecutor) {

Review comment:
       ```suggestion
   	public void initialize(final DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) {
   ```

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -454,22 +454,22 @@ public void testClusterShutdownWhenApplicationFails() throws Exception {
 
 		final ApplicationDispatcherBootstrap bootstrap =
 				new ApplicationDispatcherBootstrap(
-						program, Collections.emptyList(), configuration);
+						program, Collections.emptyList(), configuration, exception -> {});
 
 		return bootstrap.fixJobIdAndRunApplicationAsync(
 				dispatcherBuilder.build(),
 				scheduledExecutor);
 	}
 
 	private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException {
-		return createApplicationDispatcherBootstrap(noOfJobs, Collections.emptyList());
+		return createApplicationDispatcherBootstrap(noOfJobs, exception -> {});
 	}
 
 	private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(
 			int noOfJobs,
-			Collection<JobGraph> recoveredJobGraphs) throws FlinkException {
+			FatalErrorHandler errorHandler) throws FlinkException {
 		final PackagedProgram program = getProgram(noOfJobs);
-		return new ApplicationDispatcherBootstrap(program, recoveredJobGraphs, getConfiguration());
+		return new ApplicationDispatcherBootstrap(program, Collections.emptyList(), getConfiguration(), errorHandler);

Review comment:
       I think we are lacking a test which ensures that in case of an exceptionally completed result from `fixJobIdAndRunApplicationAsync`, which is not caused by a `FAILED` or `CANCELLED` job, we don't call `DispatcherGateway.shutDownCluster()`.

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -311,66 +317,61 @@ public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
 
 		bootstrap.stop();

Review comment:
       Just a side comment. Cancelling futures in `ApplicationDispatcherBootstrap` does not stop the underlying execution of the `CompletableFuture`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -208,7 +210,8 @@ public void onStart() throws Exception {
 			throw exception;
 		}
 
-		dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());
+		this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);
+		this.dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());

Review comment:
       Could `initialize` be merged with the creation step of the `ApplicationDispatcherBootstrap`?

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -233,7 +234,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
 
 		final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);
 
-		assertException(applicationFuture, JobExecutionException.class);
+		assertException(applicationFuture, ApplicationFailedException.class);

Review comment:
       As a side comment concerning the `ApplicationDispatcherBootstrapTest`: It seems a bit strange that I couldn't find a place in the tests where we are calling `ApplicationDispatcherBootstrap.initialize()`. Looking at the interface, this looks like a very important call to do before doing any other operations on the implementations.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -208,7 +210,8 @@ public void onStart() throws Exception {
 			throw exception;
 		}
 
-		dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());
+		this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);

Review comment:
       Why aren't we passing in the `fatalErrorHandler` or `Dispatcher::onFatalError`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 30c315c3eed0e0f32ceb06d3f7c64375c54c5014 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 686ff4673e0f517c56cee605e15fa002c22a06c1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029) 
   * 474f3336e9e16b830050921b634c4be4e4d70436 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712715009


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r510912135



##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -124,62 +125,64 @@ public void stop() {
 		return applicationExecutionTask;
 	}
 
-	/**
-	 * Runs the user program entrypoint using {@link #runApplicationAsync(DispatcherGateway,
-	 * ScheduledExecutor, boolean)} and shuts down the given dispatcher when the application
-	 * completes (either successfully or in case of failure).
-	 */
 	@VisibleForTesting
-	CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(
-			final DispatcherGateway dispatcher,
-			final ScheduledExecutor scheduledExecutor) {
+	CompletableFuture<Void> getApplicationCompletionFuture() {
+		return applicationCompletionFuture;
+	}
 
-		applicationCompletionFuture = fixJobIdAndRunApplicationAsync(dispatcher, scheduledExecutor);
+	@VisibleForTesting
+	CompletableFuture<Acknowledge> getClusterShutdownFuture() {
+		return clusterShutdownFuture;
+	}
 
+	/**
+	 * Runs the user program entrypoint and shuts down the given dispatcherGateway when
+	 * the application completes (either successfully or in case of failure).
+	 */
+	private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(final DispatcherGateway dispatcherGateway) {
 		return applicationCompletionFuture
 				.handle((r, t) -> {
-					final ApplicationStatus applicationStatus;
-					if (t != null) {
-
-						final Optional<JobCancellationException> cancellationException =
-								ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
-						if (cancellationException.isPresent()) {
-							// this means the Flink Job was cancelled
-							applicationStatus = ApplicationStatus.CANCELED;
-						} else if (t instanceof CancellationException) {
-							// this means that the future was cancelled
-							applicationStatus = ApplicationStatus.UNKNOWN;
-						} else {
-							applicationStatus = ApplicationStatus.FAILED;
-						}
 
-						LOG.warn("Application {}: ", applicationStatus, t);
-					} else {
-						applicationStatus = ApplicationStatus.SUCCEEDED;
+					if (t == null) {
 						LOG.info("Application completed SUCCESSFULLY");
+						return dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+					}
+
+					final Optional<ApplicationFailureException> exception =
+							ExceptionUtils.findThrowable(t, ApplicationFailureException.class);
+
+					if (exception.isPresent()) {
+						final ApplicationStatus applicationStatus = exception.get().getStatus();
+
+						if (applicationStatus == ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) {
+							LOG.info("Application {}: ", applicationStatus, t);
+							return dispatcherGateway.shutDownCluster(applicationStatus);
+						}
 					}
-					return dispatcher.shutDownCluster(applicationStatus);
+
+					LOG.warn("Exiting with Application Status UNKNOWN: ", t);

Review comment:
       Maybe 
   ```suggestion
   					LOG.warn("Application failed unexpectedly: ", t);
   ```

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -301,69 +306,132 @@ public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
 				.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
 				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING));
 
-		ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
+		// we're "listening" on this to be completed to verify that the error handler is called.
+		// In production, this will shut down the cluster with an exception.
+		final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>();
+		final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(
+				3, dispatcherBuilder.build(), scheduledExecutor, errorHandlerFuture::completeExceptionally);
 
-		final CompletableFuture<Acknowledge> shutdownFuture =
-				bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
 
 		ScheduledFuture<?> applicationExecutionFuture = bootstrap.getApplicationExecutionFuture();
 
 		bootstrap.stop();
 
-		// wait until the bootstrap "thinks" it's done
-		shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+		// we call the error handler
+		assertException(errorHandlerFuture, CancellationException.class);
+
+		// we return a future that is completed exceptionally
+		assertException(shutdownFuture, CancellationException.class);
 
 		// verify that the application task is being cancelled
 		assertThat(applicationExecutionFuture.isCancelled(), is(true));
 	}
 
 	@Test
-	public void testClusterShutdownWhenStoppingBootstrap() throws Exception {
+	public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws Exception {
+		final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
+				.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING));
+
+		// we're "listening" on this to be completed to verify that the error handler is called.
+		// In production, this will shut down the cluster with an exception.
+		final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>();
+		final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(
+				2, dispatcherBuilder.build(), scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
+
+		bootstrap.stop();
+
+		// we call the error handler
+		assertException(errorHandlerFuture, CancellationException.class);
+
+		// we return a future that is completed exceptionally
+		assertException(shutdownFuture, CancellationException.class);
+	}
+
+	@Test
+	public void testErrorHandlerIsCalledWhenSubmissionFails() throws Exception {
+		final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
+		final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
+				.setSubmitFunction(jobGraph -> {
+					throw new FlinkRuntimeException("Nope!");
+				})
+				.setClusterShutdownFunction(status -> {
+					if (clusterShutdown.f0) {
+						throw new FlinkRuntimeException("This should be called only once");
+					}
+					clusterShutdown.f0 = true;
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				});
+
+		// we're "listening" on this to be completed to verify that the error handler is called.
+		// In production, this will shut down the cluster with an exception.
+		final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>();
+		final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build();
+		final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(
+				3, dispatcherGateway, scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
+
+		// we call the error handler
+		assertException(errorHandlerFuture, ApplicationExecutionException.class);
+
+		// we return a future that is completed exceptionally
+		assertException(shutdownFuture, ApplicationExecutionException.class);
+
+		assertFalse(clusterShutdown.f0);
+		dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+		assertTrue(clusterShutdown.f0);

Review comment:
       Why is this necessary? Couldn't we write the test w/o it and then directly fail if `shutDownCluster` is being called?

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -373,57 +441,61 @@ public void testClusterShutdownWhenSubmissionFails() throws Exception {
 	}
 
 	@Test
-	public void testClusterShutdownWhenApplicationSucceeds() throws Exception {
+	public void testClusterShutdownWhenApplicationGetsCancelled() throws Exception {
 		// we're "listening" on this to be completed to verify that the cluster
 		// is being shut down from the ApplicationDispatcherBootstrap
 		final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>();
 
 		final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
 				.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
-				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FINISHED))
-				.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createSuccessfulJobResult(jobId)))
+				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.CANCELED))
+				.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createCancelledJobResult(jobId)))
 				.setClusterShutdownFunction((status) -> {
 					externalShutdownFuture.complete(status);
 					return CompletableFuture.completedFuture(Acknowledge.get());
 				});
 
-		ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
+		ApplicationDispatcherBootstrap bootstrap =
+				createApplicationDispatcherBootstrap(3, dispatcherBuilder.build(), scheduledExecutor);
 
-		final CompletableFuture<Acknowledge> shutdownFuture =
-				bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
 
 		// wait until the bootstrap "thinks" it's done
 		shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
 		// verify that the dispatcher is actually being shut down
-		assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.SUCCEEDED));
+		assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.CANCELED));
 	}
 
 	@Test
-	public void testClusterShutdownWhenApplicationFails() throws Exception {
+	public void testClusterDoesNOTShutdownWhenApplicationStatusUknown() throws Exception {
 		// we're "listening" on this to be completed to verify that the cluster
 		// is being shut down from the ApplicationDispatcherBootstrap
-		final CompletableFuture<ApplicationStatus> externalShutdownFuture = new CompletableFuture<>();
-
+		final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
 		final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
 				.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
 				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.FAILED))
-				.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createFailedJobResult(jobId)))
-				.setClusterShutdownFunction((status) -> {
-					externalShutdownFuture.complete(status);
+				.setRequestJobResultFunction(jobId -> CompletableFuture.completedFuture(createUnknownJobResult(jobId)))
+				.setClusterShutdownFunction(status -> {
+					if (clusterShutdown.f0) {
+						throw new FlinkRuntimeException("This should be called only once");
+					}
+					clusterShutdown.f0 = true;
 					return CompletableFuture.completedFuture(Acknowledge.get());
 				});
 
-		ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
+		final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build();
+		final ApplicationDispatcherBootstrap bootstrap =
+				createApplicationDispatcherBootstrap(3, dispatcherGateway, scheduledExecutor);
 
-		final CompletableFuture<Acknowledge> shutdownFuture =
-				bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
+		final CompletableFuture<Acknowledge> applicationFuture = bootstrap.getClusterShutdownFuture();
 
-		// wait until the bootstrap "thinks" it's done
-		shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+		final ApplicationFailureException exception = assertException(applicationFuture, ApplicationFailureException.class);
+		assertEquals(exception.getStatus(), ApplicationStatus.UNKNOWN);
 
-		// verify that the dispatcher is actually being shut down
-		assertThat(externalShutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS), is(ApplicationStatus.FAILED));
+		assertFalse(clusterShutdown.f0);
+		dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+		assertTrue(clusterShutdown.f0);

Review comment:
       Same here, why do we need this?

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -301,69 +306,132 @@ public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
 				.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
 				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING));
 
-		ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(3);
+		// we're "listening" on this to be completed to verify that the error handler is called.
+		// In production, this will shut down the cluster with an exception.
+		final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>();
+		final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(
+				3, dispatcherBuilder.build(), scheduledExecutor, errorHandlerFuture::completeExceptionally);
 
-		final CompletableFuture<Acknowledge> shutdownFuture =
-				bootstrap.runApplicationAndShutdownClusterAsync(dispatcherBuilder.build(), scheduledExecutor);
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
 
 		ScheduledFuture<?> applicationExecutionFuture = bootstrap.getApplicationExecutionFuture();
 
 		bootstrap.stop();
 
-		// wait until the bootstrap "thinks" it's done
-		shutdownFuture.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+		// we call the error handler
+		assertException(errorHandlerFuture, CancellationException.class);
+
+		// we return a future that is completed exceptionally
+		assertException(shutdownFuture, CancellationException.class);
 
 		// verify that the application task is being cancelled
 		assertThat(applicationExecutionFuture.isCancelled(), is(true));
 	}
 
 	@Test
-	public void testClusterShutdownWhenStoppingBootstrap() throws Exception {
+	public void testErrorHandlerIsCalledWhenStoppingBootstrap() throws Exception {
+		final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
+				.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+				.setRequestJobStatusFunction(jobId -> CompletableFuture.completedFuture(JobStatus.RUNNING));
+
+		// we're "listening" on this to be completed to verify that the error handler is called.
+		// In production, this will shut down the cluster with an exception.
+		final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>();
+		final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(
+				2, dispatcherBuilder.build(), scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
+
+		bootstrap.stop();
+
+		// we call the error handler
+		assertException(errorHandlerFuture, CancellationException.class);
+
+		// we return a future that is completed exceptionally
+		assertException(shutdownFuture, CancellationException.class);
+	}
+
+	@Test
+	public void testErrorHandlerIsCalledWhenSubmissionFails() throws Exception {
+		final Tuple1<Boolean> clusterShutdown = new Tuple1<>(false);
+		final TestingDispatcherGateway.Builder dispatcherBuilder = new TestingDispatcherGateway.Builder()
+				.setSubmitFunction(jobGraph -> {
+					throw new FlinkRuntimeException("Nope!");
+				})
+				.setClusterShutdownFunction(status -> {
+					if (clusterShutdown.f0) {
+						throw new FlinkRuntimeException("This should be called only once");
+					}
+					clusterShutdown.f0 = true;
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				});
+
+		// we're "listening" on this to be completed to verify that the error handler is called.
+		// In production, this will shut down the cluster with an exception.
+		final CompletableFuture<Void> errorHandlerFuture = new CompletableFuture<>();
+		final TestingDispatcherGateway dispatcherGateway = dispatcherBuilder.build();
+		final ApplicationDispatcherBootstrap bootstrap = createApplicationDispatcherBootstrap(
+				3, dispatcherGateway, scheduledExecutor, errorHandlerFuture::completeExceptionally);
+
+		final CompletableFuture<Acknowledge> shutdownFuture = bootstrap.getClusterShutdownFuture();
+
+		// we call the error handler
+		assertException(errorHandlerFuture, ApplicationExecutionException.class);
+
+		// we return a future that is completed exceptionally
+		assertException(shutdownFuture, ApplicationExecutionException.class);
+
+		assertFalse(clusterShutdown.f0);
+		dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED);
+		assertTrue(clusterShutdown.f0);

Review comment:
       Or we simply add a counter to `setClusterShutdownFunction` and assert that its value `0` at the end of the test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 30c315c3eed0e0f32ceb06d3f7c64375c54c5014 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043",
       "triggerID" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5373cbacec0ef3276a214bf67be7de9387c2c88b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5373cbacec0ef3276a214bf67be7de9387c2c88b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 474f3336e9e16b830050921b634c4be4e4d70436 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043) 
   * 5373cbacec0ef3276a214bf67be7de9387c2c88b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u closed pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u closed pull request #13699:
URL: https://github.com/apache/flink/pull/13699


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043",
       "triggerID" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 686ff4673e0f517c56cee605e15fa002c22a06c1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029) 
   * 474f3336e9e16b830050921b634c4be4e4d70436 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043",
       "triggerID" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 474f3336e9e16b830050921b634c4be4e4d70436 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r509178005



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -208,7 +210,8 @@ public void onStart() throws Exception {
 			throw exception;
 		}
 
-		dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());
+		this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);

Review comment:
       The reason for this is that this would bubble up to `ClusterEntrypoint` calling `System.exit(RUNTIME_FAILURE_RETURN_CODE);` and I do not know if this will kill that cluster without deleting the HA data. 
   
   If this is this the contract, then I can do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043",
       "triggerID" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5373cbacec0ef3276a214bf67be7de9387c2c88b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8280",
       "triggerID" : "5373cbacec0ef3276a214bf67be7de9387c2c88b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5373cbacec0ef3276a214bf67be7de9387c2c88b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8280) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30c315c3eed0e0f32ceb06d3f7c64375c54c5014 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30c315c3eed0e0f32ceb06d3f7c64375c54c5014 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925) 
   * 686ff4673e0f517c56cee605e15fa002c22a06c1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30c315c3eed0e0f32ceb06d3f7c64375c54c5014 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925) 
   * 686ff4673e0f517c56cee605e15fa002c22a06c1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 30c315c3eed0e0f32ceb06d3f7c64375c54c5014 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-716386084


   Thanks for the review @tillrohrmann ! I integrated your comments and I will merge as soon as AZP gives green.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u commented on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-713818957


   Thanks for the comments @tillrohrmann ! 
   
   I integrated them. One of your comments was to merge the `initialize()` with the constructor of the `ApplicationDispatcherBootstrap`. I am a bit torn about launching async calls in the constructor, as required by this change, but let me know what you think (it is integrated as a separate commit).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r509599297



##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -233,7 +234,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
 
 		final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);
 
-		assertException(applicationFuture, JobExecutionException.class);
+		assertException(applicationFuture, ApplicationFailedException.class);

Review comment:
       I agree that now we can rethink it, but if I understand the comment correctly, I tend to disagree that this shows that the class follows a procedural style. The only reason that this method was exposed was for testing and it was not accessed by other classes. In that sense, internal state of the class was not "leaking" as even these methods were sealing internal state from outside tampering.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r509183908



##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -233,7 +234,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
 
 		final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);
 
-		assertException(applicationFuture, JobExecutionException.class);
+		assertException(applicationFuture, ApplicationFailedException.class);

Review comment:
       The problem was that we were passing a `Dispatcher` which is difficult to instantiate and an overkill for such as test. This is the reason why the `runApplicationAndShutdownClusterAsync()` is annotated as `visibleForTesting` and is not private, so that all the tests can call this method.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13699:
URL: https://github.com/apache/flink/pull/13699#issuecomment-712700391


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7919",
       "triggerID" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30c315c3eed0e0f32ceb06d3f7c64375c54c5014",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7925",
       "triggerID" : "712715009",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8029",
       "triggerID" : "686ff4673e0f517c56cee605e15fa002c22a06c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043",
       "triggerID" : "474f3336e9e16b830050921b634c4be4e4d70436",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5373cbacec0ef3276a214bf67be7de9387c2c88b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8280",
       "triggerID" : "5373cbacec0ef3276a214bf67be7de9387c2c88b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 474f3336e9e16b830050921b634c4be4e4d70436 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8043) 
   * 5373cbacec0ef3276a214bf67be7de9387c2c88b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8280) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org