You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/20 13:43:24 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13699: [FLINK-19154] ApplicationDispatcherBootstrap clean up HA data only when necessary

tillrohrmann commented on a change in pull request #13699:
URL: https://github.com/apache/flink/pull/13699#discussion_r508486516



##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/exceptions/ApplicationFailureException.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application.exceptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedThrowable;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Generic exception to signal the failure of an application.
+ * This is to be subclassed by more specific exceptions.
+ */
+@Internal
+public class ApplicationFailureException extends FlinkException {

Review comment:
       Instead of having multiple subclasses of `ApplicationFailureException` we could have a single `ApplicationExecutionException` which also contains the application status as a field. Then one could do the matching based on the `ApplicationStatus` instead of the different sub classes. This might be a bit more robust.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -155,8 +160,9 @@ public void stop() {
 						}
 
 						LOG.warn("Exiting with Application Status UNKNOWN: ", t);
-						return dispatcher.shutDownCluster(ApplicationStatus.UNKNOWN);
 
+						this.errorHandler.onFatalError(t);

Review comment:
       Maybe add an exception message `new FlinkException("Application failed unexpectedly.", t)` or so.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -73,35 +72,38 @@
  * if it should submit a job for execution (in case of a new job) or the job was already recovered and is running.
  */
 @Internal
-public class ApplicationDispatcherBootstrap extends AbstractDispatcherBootstrap {
+public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ApplicationDispatcherBootstrap.class);
 
 	public static final JobID ZERO_JOB_ID = new JobID(0, 0);
 
 	private final PackagedProgram application;
 
-	private final Collection<JobGraph> recoveredJobs;
+	private final Collection<JobID> recoveredJobIds;
 
 	private final Configuration configuration;
 
+	private final FatalErrorHandler errorHandler;
+
 	private CompletableFuture<Void> applicationCompletionFuture;
 
 	private ScheduledFuture<?> applicationExecutionTask;
 
 	public ApplicationDispatcherBootstrap(
 			final PackagedProgram application,
-			final Collection<JobGraph> recoveredJobs,
-			final Configuration configuration) {
+			final Collection<JobID> recoveredJobIds,
+			final Configuration configuration,
+			final FatalErrorHandler errorHandler) {
 		this.configuration = checkNotNull(configuration);
-		this.recoveredJobs = checkNotNull(recoveredJobs);
+		this.recoveredJobIds = checkNotNull(recoveredJobIds);
 		this.application = checkNotNull(application);
+		this.errorHandler = checkNotNull(errorHandler);
 	}
 
 	@Override
-	public void initialize(final Dispatcher dispatcher, ScheduledExecutor scheduledExecutor) {
+	public void initialize(final DispatcherGateway dispatcher, ScheduledExecutor scheduledExecutor) {

Review comment:
       ```suggestion
   	public void initialize(final DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) {
   ```

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -454,22 +454,22 @@ public void testClusterShutdownWhenApplicationFails() throws Exception {
 
 		final ApplicationDispatcherBootstrap bootstrap =
 				new ApplicationDispatcherBootstrap(
-						program, Collections.emptyList(), configuration);
+						program, Collections.emptyList(), configuration, exception -> {});
 
 		return bootstrap.fixJobIdAndRunApplicationAsync(
 				dispatcherBuilder.build(),
 				scheduledExecutor);
 	}
 
 	private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(int noOfJobs) throws FlinkException {
-		return createApplicationDispatcherBootstrap(noOfJobs, Collections.emptyList());
+		return createApplicationDispatcherBootstrap(noOfJobs, exception -> {});
 	}
 
 	private ApplicationDispatcherBootstrap createApplicationDispatcherBootstrap(
 			int noOfJobs,
-			Collection<JobGraph> recoveredJobGraphs) throws FlinkException {
+			FatalErrorHandler errorHandler) throws FlinkException {
 		final PackagedProgram program = getProgram(noOfJobs);
-		return new ApplicationDispatcherBootstrap(program, recoveredJobGraphs, getConfiguration());
+		return new ApplicationDispatcherBootstrap(program, Collections.emptyList(), getConfiguration(), errorHandler);

Review comment:
       I think we are lacking a test which ensures that in case of an exceptionally completed result from `fixJobIdAndRunApplicationAsync`, which is not caused by a `FAILED` or `CANCELLED` job, we don't call `DispatcherGateway.shutDownCluster()`.

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -311,66 +317,61 @@ public void testApplicationIsStoppedWhenStoppingBootstrap() throws Exception {
 
 		bootstrap.stop();

Review comment:
       Just a side comment. Cancelling futures in `ApplicationDispatcherBootstrap` does not stop the underlying execution of the `CompletableFuture`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -208,7 +210,8 @@ public void onStart() throws Exception {
 			throw exception;
 		}
 
-		dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());
+		this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);
+		this.dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());

Review comment:
       Could `initialize` be merged with the creation step of the `ApplicationDispatcherBootstrap`?

##########
File path: flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -233,7 +234,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
 
 		final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2);
 
-		assertException(applicationFuture, JobExecutionException.class);
+		assertException(applicationFuture, ApplicationFailedException.class);

Review comment:
       As a side comment concerning the `ApplicationDispatcherBootstrapTest`: It seems a bit strange that I couldn't find a place in the tests where we are calling `ApplicationDispatcherBootstrap.initialize()`. Looking at the interface, this looks like a very important call to do before doing any other operations on the implementations.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -208,7 +210,8 @@ public void onStart() throws Exception {
 			throw exception;
 		}
 
-		dispatcherBootstrap.initialize(this, this.getRpcService().getScheduledExecutor());
+		this.dispatcherBootstrap = this.dispatcherBootstrapFactory.apply(shutDownFuture::completeExceptionally);

Review comment:
       Why aren't we passing in the `fatalErrorHandler` or `Dispatcher::onFatalError`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org