You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 08:38:57 UTC

[flink] branch master updated: [FLINK-24770] Remove error from JobStatusListener

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a5ca14  [FLINK-24770] Remove error from JobStatusListener
0a5ca14 is described below

commit 0a5ca14df4a423792c270e6c7a6129715a598754
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Nov 5 09:38:33 2021 +0100

    [FLINK-24770] Remove error from JobStatusListener
    
    The error is not currently used anywhere and makes it harder to add more uses of the interface.
---
 .../runtime/checkpoint/CheckpointCoordinatorDeActivator.java      | 3 +--
 .../flink/runtime/executiongraph/DefaultExecutionGraph.java       | 8 +++-----
 .../apache/flink/runtime/executiongraph/JobStatusListener.java    | 4 +---
 .../main/java/org/apache/flink/runtime/jobmaster/JobMaster.java   | 5 +----
 .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java       | 3 +--
 .../runtime/scheduler/GloballyTerminalJobStatusListener.java      | 3 +--
 .../org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java | 3 +--
 .../runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java      | 2 +-
 .../flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java   | 2 +-
 9 files changed, 11 insertions(+), 22 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 234454d..b4f52dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -37,8 +37,7 @@ public class CheckpointCoordinatorDeActivator implements JobStatusListener {
     }
 
     @Override
-    public void jobStatusChanges(
-            JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
         if (newJobStatus == JobStatus.RUNNING) {
             // start the checkpoint scheduler
             coordinator.startCheckpointScheduler();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 9338261..2906e9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -79,7 +79,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.FutureUtils.ConjunctFuture;
@@ -1047,7 +1046,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
                     error);
 
             stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-            notifyJobStatusChange(newState, error);
+            notifyJobStatusChange(newState);
             return true;
         } else {
             return false;
@@ -1439,14 +1438,13 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
         }
     }
 
-    private void notifyJobStatusChange(JobStatus newState, Throwable error) {
+    private void notifyJobStatusChange(JobStatus newState) {
         if (jobStatusListeners.size() > 0) {
             final long timestamp = System.currentTimeMillis();
-            final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
 
             for (JobStatusListener listener : jobStatusListeners) {
                 try {
-                    listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
+                    listener.jobStatusChanges(getJobID(), newState, timestamp);
                 } catch (Throwable t) {
                     LOG.warn("Error while notifying JobStatusListener", t);
                 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
index f1ef015..dc30d18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -30,8 +30,6 @@ public interface JobStatusListener {
      * @param jobId The ID of the job.
      * @param newJobStatus The status the job switched to.
      * @param timestamp The timestamp when the status transition occurred.
-     * @param error In case the job status switches to a failure state, this is the exception that
-     *     caused the failure.
      */
-    void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error);
+    void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index dcbeaf7..99cf0a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1304,10 +1304,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
 
         @Override
         public void jobStatusChanges(
-                final JobID jobId,
-                final JobStatus newJobStatus,
-                final long timestamp,
-                final Throwable error) {
+                final JobID jobId, final JobStatus newJobStatus, final long timestamp) {
 
             if (running) {
                 // run in rpc thread to avoid concurrency
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4d4b5fe..bd9f859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1072,8 +1072,7 @@ public class AdaptiveScheduler
             jobStatusListener.jobStatusChanges(
                     jobInformation.getJobID(),
                     archivedExecutionGraph.getState(),
-                    archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
-                    optionalFailure);
+                    archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()));
         }
 
         jobTerminationFuture.complete(archivedExecutionGraph.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
index dcc2a1b..5ed648e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
@@ -31,8 +31,7 @@ public class GloballyTerminalJobStatusListener implements JobStatusListener {
             new CompletableFuture<>();
 
     @Override
-    public void jobStatusChanges(
-            JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
         if (newJobStatus.isGloballyTerminalState()) {
             globallyTerminalJobStatusFuture.complete(newJobStatus);
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 6967c72..a973000 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -417,8 +417,7 @@ public class SchedulerTestingUtils {
         private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
         private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
                 new TestExecutionSlotAllocatorFactory();
-        private JobStatusListener jobStatusListener =
-                (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+        private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
 
         public DefaultSchedulerBuilder(
                 final JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 7d9318f..12816fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -78,7 +78,7 @@ public class AdaptiveSchedulerBuilder {
             error ->
                     FatalExitExceptionHandler.INSTANCE.uncaughtException(
                             Thread.currentThread(), error);
-    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
     private long initializationTimestamp = System.currentTimeMillis();
 
     @Nullable private SlotAllocator slotAllocator;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 147b041..b40be43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -612,7 +612,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(createJobGraph(), mainThreadExecutor)
                         .setJobStatusListener(
-                                (jobId, newJobStatus, timestamp, error) ->
+                                (jobId, newJobStatus, timestamp) ->
                                         jobStatusUpdate.set(newJobStatus))
                         .build();