You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 08:38:57 UTC
[flink] branch master updated: [FLINK-24770] Remove error from
JobStatusListener
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0a5ca14 [FLINK-24770] Remove error from JobStatusListener
0a5ca14 is described below
commit 0a5ca14df4a423792c270e6c7a6129715a598754
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Nov 5 09:38:33 2021 +0100
[FLINK-24770] Remove error from JobStatusListener
The error is not currently used anywhere and makes it harder to add more uses of the interface.
---
.../runtime/checkpoint/CheckpointCoordinatorDeActivator.java | 3 +--
.../flink/runtime/executiongraph/DefaultExecutionGraph.java | 8 +++-----
.../apache/flink/runtime/executiongraph/JobStatusListener.java | 4 +---
.../main/java/org/apache/flink/runtime/jobmaster/JobMaster.java | 5 +----
.../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 3 +--
.../runtime/scheduler/GloballyTerminalJobStatusListener.java | 3 +--
.../org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java | 3 +--
.../runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java | 2 +-
.../flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java | 2 +-
9 files changed, 11 insertions(+), 22 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 234454d..b4f52dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -37,8 +37,7 @@ public class CheckpointCoordinatorDeActivator implements JobStatusListener {
}
@Override
- public void jobStatusChanges(
- JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 9338261..2906e9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -79,7 +79,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.FutureUtils.ConjunctFuture;
@@ -1047,7 +1046,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
error);
stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
- notifyJobStatusChange(newState, error);
+ notifyJobStatusChange(newState);
return true;
} else {
return false;
@@ -1439,14 +1438,13 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
}
}
- private void notifyJobStatusChange(JobStatus newState, Throwable error) {
+ private void notifyJobStatusChange(JobStatus newState) {
if (jobStatusListeners.size() > 0) {
final long timestamp = System.currentTimeMillis();
- final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
for (JobStatusListener listener : jobStatusListeners) {
try {
- listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
+ listener.jobStatusChanges(getJobID(), newState, timestamp);
} catch (Throwable t) {
LOG.warn("Error while notifying JobStatusListener", t);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
index f1ef015..dc30d18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -30,8 +30,6 @@ public interface JobStatusListener {
* @param jobId The ID of the job.
* @param newJobStatus The status the job switched to.
* @param timestamp The timestamp when the status transition occurred.
- * @param error In case the job status switches to a failure state, this is the exception that
- * caused the failure.
*/
- void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error);
+ void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index dcbeaf7..99cf0a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1304,10 +1304,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
@Override
public void jobStatusChanges(
- final JobID jobId,
- final JobStatus newJobStatus,
- final long timestamp,
- final Throwable error) {
+ final JobID jobId, final JobStatus newJobStatus, final long timestamp) {
if (running) {
// run in rpc thread to avoid concurrency
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 4d4b5fe..bd9f859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1072,8 +1072,7 @@ public class AdaptiveScheduler
jobStatusListener.jobStatusChanges(
jobInformation.getJobID(),
archivedExecutionGraph.getState(),
- archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
- optionalFailure);
+ archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()));
}
jobTerminationFuture.complete(archivedExecutionGraph.getState());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
index dcc2a1b..5ed648e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/GloballyTerminalJobStatusListener.java
@@ -31,8 +31,7 @@ public class GloballyTerminalJobStatusListener implements JobStatusListener {
new CompletableFuture<>();
@Override
- public void jobStatusChanges(
- JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
if (newJobStatus.isGloballyTerminalState()) {
globallyTerminalJobStatusFuture.complete(newJobStatus);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 6967c72..a973000 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -417,8 +417,7 @@ public class SchedulerTestingUtils {
private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory =
new TestExecutionSlotAllocatorFactory();
- private JobStatusListener jobStatusListener =
- (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+ private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
public DefaultSchedulerBuilder(
final JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 7d9318f..12816fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -78,7 +78,7 @@ public class AdaptiveSchedulerBuilder {
error ->
FatalExitExceptionHandler.INSTANCE.uncaughtException(
Thread.currentThread(), error);
- private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC, ignoredD) -> {};
+ private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {};
private long initializationTimestamp = System.currentTimeMillis();
@Nullable private SlotAllocator slotAllocator;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 147b041..b40be43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -612,7 +612,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
final AdaptiveScheduler scheduler =
new AdaptiveSchedulerBuilder(createJobGraph(), mainThreadExecutor)
.setJobStatusListener(
- (jobId, newJobStatus, timestamp, error) ->
+ (jobId, newJobStatus, timestamp) ->
jobStatusUpdate.set(newJobStatus))
.build();