You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/18 09:14:58 UTC

[flink] 10/12: [FLINK-21100][coordination] Pass FatalErrorHandler to scheduler factory

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c463416d9b1303d7b54ec3fccf8f2c0d9f436ab
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Feb 9 21:22:37 2021 +0100

    [FLINK-21100][coordination] Pass FatalErrorHandler to scheduler factory
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java     | 1 +
 .../org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java     | 2 ++
 .../java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java     | 2 ++
 .../java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java | 2 ++
 .../org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java   | 2 ++
 5 files changed, 9 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6776462..844d144 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -356,6 +356,7 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId>
                         executionDeploymentTracker,
                         initializationTimestamp,
                         getMainThreadExecutor(),
+                        fatalErrorHandler,
                         jobStatusListener);
 
         return scheduler;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index 27e7b9b..c888f65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory {
             final ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             final ComponentMainThreadExecutor mainThreadExecutor,
+            final FatalErrorHandler fatalErrorHandler,
             final JobStatusListener jobStatusListener)
             throws Exception {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index efb972a..1707779 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public interface SchedulerNGFactory {
             ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener)
             throws Exception;
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
index b83d80b..8485fe8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
@@ -103,6 +104,7 @@ public class JobMasterSchedulerTest extends TestLogger {
                 ExecutionDeploymentTracker executionDeploymentTracker,
                 long initializationTimestamp,
                 ComponentMainThreadExecutor mainThreadExecutor,
+                FatalErrorHandler fatalErrorHandler,
                 JobStatusListener jobStatusListener) {
             return TestingSchedulerNG.newBuilder()
                     .setStartSchedulingRunnable(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
index d066aec..cd3ab44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 
 import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory {
             ExecutionDeploymentTracker executionDeploymentTracker,
             long initializationTimestamp,
             ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
             JobStatusListener jobStatusListener)
             throws Exception {
         return schedulerNG;