You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/01 07:11:36 UTC

[GitHub] [flink] xintongsong commented on a diff in pull request #21347: [FLINK-29640][Client/Job Submission]Enhance the function configured by execution.shutdown-on-attached-exi…

xintongsong commented on code in PR #21347:
URL: https://github.com/apache/flink/pull/21347#discussion_r1036742632


##########
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java:
##########
@@ -271,4 +271,9 @@ default CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoor
             @RpcTimeout Time timeout) {
         throw new UnsupportedOperationException();
     }
+
+    /** The client reports the heartbeat to the dispatcher for aliveness. */
+    default CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        return CompletableFuture.completedFuture(null);

Review Comment:
   Use `FutureUtils.completedVoidFuture()` to reduce object creation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   The future based approach seems to be an overkill for this feature. For each job and on each heartbeat, it creates a new feature, which seems to be expensive.
   
   I'd suggest to simply record the last heartbeat timestamp of each job, and periodically check whether there's any job that has timed out. This is similar to how we check and release idle TMs. It may not trigger a timeout precisely on the configured time, but would be much cheaper.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1016,6 +1034,47 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public CompletableFuture<Void> reportJobClientHeartbeat(JobID jobId, Time timeout) {
+        if (!getJobManagerRunner(jobId).isPresent()) {
+            log.warn("Fail to find job {} for client.", jobId);
+        } else {
+            log.debug("Job {} receives client's heartbeat.", jobId);
+            markJobClientAliveness(jobId);
+            resetJobClientAlivenessCheck(jobId);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void markJobClientAliveness(JobID jobId) {
+        if (jobClientAlivenessFutures.containsKey(jobId)) {
+            jobClientAlivenessFutures.get(jobId).complete(null);
+        }
+    }
+
+    private void resetJobClientAlivenessCheck(JobID jobId) {
+        CompletableFuture<Void> clientAlivenessFuture = new CompletableFuture<>();
+        jobClientAlivenessFutures.put(jobId, clientAlivenessFuture);
+        FutureUtils.orTimeout(
+                clientAlivenessFuture,
+                jobClientHeartbeatTimeout,
+                TimeUnit.MILLISECONDS,
+                getMainThreadExecutor());
+        clientAlivenessFuture.whenComplete(
+                (t, throwable) -> {
+                    if (throwable != null) {
+                        if (throwable instanceof TimeoutException) {
+                            log.warn(
+                                    "Haven't receive aliveness from job client and cancel the job {}.",
+                                    jobId);
+                            cancelJob(jobId, webTimeout);

Review Comment:
   What happens if there's something wrong in canceling the job? E.g., a timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org