You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2023/12/01 09:47:45 UTC

(flink-kubernetes-operator) branch main updated: [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 51a91049 [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716)
51a91049 is described below

commit 51a91049b5f17f8a0b21e11feceb4410a97c50c1
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Dec 1 10:47:40 2023 +0100

    [FLINK-33522] Be ware of SerializedThrowable when checking for StopWithSavepointStoppingException (#716)
    
    Turns out that the previous detection code in #706 may not always fire correctly
    due to an encapsulated serialized throwable. This minor change fixes that.
---
 .../flink/kubernetes/operator/service/AbstractFlinkService.java     | 6 ++++--
 .../flink/kubernetes/operator/service/AbstractFlinkServiceTest.java | 5 +++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 0821b439..3c4fe4aa 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -331,8 +331,10 @@ public abstract class AbstractFlinkService implements FlinkService {
                                     exception);
                         } catch (Exception e) {
                             var stopWithSavepointException =
-                                    ExceptionUtils.findThrowable(
-                                            e, StopWithSavepointStoppingException.class);
+                                    ExceptionUtils.findThrowableSerializedAware(
+                                            e,
+                                            StopWithSavepointStoppingException.class,
+                                            getClass().getClassLoader());
                             if (stopWithSavepointException.isPresent()) {
                                 // Handle edge case where the savepoint completes but the job fails
                                 // right afterward.
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index c12caf21..a108b8bb 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -651,8 +651,9 @@ public class AbstractFlinkServiceTest {
                     if (failAfterSavepointCompletes) {
                         stopWithSavepointFuture.completeExceptionally(
                                 new CompletionException(
-                                        new StopWithSavepointStoppingException(
-                                                savepointPath, jobID)));
+                                        new SerializedThrowable(
+                                                new StopWithSavepointStoppingException(
+                                                        savepointPath, jobID))));
                     } else {
                         stopWithSavepointFuture.complete(
                                 new Tuple3<>(id, formatType, savepointDir));