You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:14:58 UTC
[flink] 10/12: [FLINK-21100][coordination] Pass FatalErrorHandler
to scheduler factory
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3c463416d9b1303d7b54ec3fccf8f2c0d9f436ab
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Feb 9 21:22:37 2021 +0100
[FLINK-21100][coordination] Pass FatalErrorHandler to scheduler factory
---
.../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java | 1 +
.../org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java | 2 ++
.../java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java | 2 ++
.../java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java | 2 ++
.../org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java | 2 ++
5 files changed, 9 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6776462..844d144 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -356,6 +356,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
executionDeploymentTracker,
initializationTimestamp,
getMainThreadExecutor(),
+ fatalErrorHandler,
jobStatusListener);
return scheduler;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 27e7b9b..c888f65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
final ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp,
final ComponentMainThreadExecutor mainThreadExecutor,
+ final FatalErrorHandler fatalErrorHandler,
final JobStatusListener jobStatusListener)
throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index efb972a..1707779 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public interface SchedulerNGFactory {
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener)
throws Exception;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index b83d80b..8485fe8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
@@ -103,6 +104,7 @@ public class JobMasterSchedulerTest extends TestLogger {
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener) {
return TestingSchedulerNG.newBuilder()
.setStartSchedulingRunnable(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index d066aec..cd3ab44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory {
ExecutionDeploymentTracker executionDeploymentTracker,
long initializationTimestamp,
ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
JobStatusListener jobStatusListener)
throws Exception {
return schedulerNG;