You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 10:10:27 UTC

[GitHub] [flink] Tartarus0zm opened a new pull request, #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Tartarus0zm opened a new pull request, #20223:
URL: https://github.com/apache/flink/pull/20223

   ## What is the purpose of the change
   
   Introduce JobStatusHook
   User can customize Hook to execute on JM side
   
   
   ## Brief change log
   
   Introduce JobStatusHook
   Users can register JobStatusHook through StreamGraph.
   
   ## Verifying this change
   
   DefaultSchedulerTest#testJobStatusHookWithJobFailed
   DefaultSchedulerTest#testJobStatusHookWithJobCanceled
   DefaultSchedulerTest#testJobStatusHookWithJobFinished
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on PR #20223:
URL: https://github.com/apache/flink/pull/20223#issuecomment-1181267795

   @lsyldliu @reswqa  thanks for your review. I have addressed the comments left.
   Please take a look again.
   cc @gaoyunhaii 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii closed pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook
URL: https://github.com/apache/flink/pull/20223


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r919606689


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusHook.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/**
+ * Hooks provided by users on job status changing. Triggered at the initial(CREATED) and final
+ * state(FINISHED/CANCELED/FAILED) of the job.
+ *
+ * <p>Usage examples: <code>
+ *     StreamGraph streamGraph = env.getStreamGraph();
+ *     streamGraph.registerJobStatusHook(myJobStatusHook);
+ *     streamGraph.setJobName("my_flink");
+ *     env.execute(streamGraph);
+ * </code>
+ */
+@Internal
+public interface JobStatusHook extends Serializable {
+
+    /** When Job become {@link JobStatus#CREATED} status, it would only be called one time. */

Review Comment:
   `become` -> `becomes` 
   
   nit: import JobStatus to avoid warnings



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1618,6 +1621,141 @@ public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Except
         }
     }
 
+    @Test
+    public void testJobStatusHookWithJobFailed() throws Exception {

Review Comment:
   The three test methods seem quite similar, could we extract a common help method?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -628,4 +632,12 @@ public void writeUserArtifactEntriesToConfiguration() {
                     userArtifact.getKey(), userArtifact.getValue(), jobConfiguration);
         }
     }
+
+    public void setJobStatusHooks(List<JobStatusHook> hooks) {
+        this.jobStatusHooks = hooks;

Review Comment:
   Might also `checkNotNull` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on PR #20223:
URL: https://github.com/apache/flink/pull/20223#issuecomment-1183132966

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r918724893


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -271,6 +271,10 @@ private JobGraph createJobGraph() {
             throw new FlinkRuntimeException("Error in serialization.", e);
         }
 
+        if (streamGraph.getJobStatusHooks().size() > 0) {

Review Comment:
   ```suggestion
           if (!streamGraph.getJobStatusHooks().isEmpty()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r916762924


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1544,6 +1552,30 @@ private void notifyJobStatusChange(JobStatus newState) {
         }
     }
 
+    private void notifyJobStatusHooks(JobStatus newState, Throwable cause) {
+        JobID jobID = jobInformation.getJobId();
+        for (JobStatusHook hook : jobStatusHooks) {
+            try {
+                switch (newState) {
+                    case CREATED:
+                        hook.onCreated(jobID);
+                        break;
+                    case CANCELED:
+                        hook.onCanceled(jobID);
+                        break;
+                    case FAILED:
+                        hook.onFailed(jobID, cause);
+                        break;
+                    case FINISHED:
+                        hook.onFinished(jobID);
+                        break;
+                }
+            } catch (Throwable ignore) {
+                LOG.warn("Error while notifying JobStatusHook[{}]", hook.getClass(), ignore);

Review Comment:
   I think this exception should not be ignored, if the hook execute failed, the job execution will occur other exception which will confuse the user, so I think we should throw this exception early.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusHook.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/** Hooks provided by users on job status changing. */

Review Comment:
   I suggest add more annotation about this hook, let user know its function and how to use it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusHook.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/** Hooks provided by users on job status changing. */
+@Internal
+public interface JobStatusHook extends Serializable {
+
+    /** When Job become CREATED status. It would only be called one time. */

Review Comment:
   ```suggestion
       /** When Job become CREATED status, it would only be called one time. */
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -225,6 +225,8 @@ private JobGraph createJobGraph() {
 
         setVertexDescription();
 
+        jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());

Review Comment:
   I think it would be better to set hooks when hooks > 0.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1034,4 +1037,18 @@ public void setVertexNameIncludeIndexPrefix(boolean includePrefix) {
     public boolean isVertexNameIncludeIndexPrefix() {
         return this.vertexNameIncludeIndexPrefix;
     }
+
+    /** Registers the JobStatusHook. */
+    public void registerJobStatusHook(JobStatusHook hook) {
+        if (hook == null) {
+            throw new IllegalArgumentException();

Review Comment:
   Give an explicitly exception message.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -628,4 +632,13 @@ public void writeUserArtifactEntriesToConfiguration() {
                     userArtifact.getKey(), userArtifact.getValue(), jobConfiguration);
         }
     }
+
+    public void setJobStatusHooks(List<JobStatusHook> hooks) {

Review Comment:
   I think here two implementation way:
   1. follow `addJars` method, method rename to `addJobStatusHooks `,  add the object of list to existing list. Maybe we can register hooks multiply instead of clear it before set.
   2. follow `setClasspaths` method, jobStatusHooks initial value is `Collection.emptyList()`, here assign list to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r918502121


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1544,6 +1552,30 @@ private void notifyJobStatusChange(JobStatus newState) {
         }
     }
 
+    private void notifyJobStatusHooks(JobStatus newState, Throwable cause) {
+        JobID jobID = jobInformation.getJobId();
+        for (JobStatusHook hook : jobStatusHooks) {
+            try {
+                switch (newState) {
+                    case CREATED:
+                        hook.onCreated(jobID);
+                        break;
+                    case CANCELED:
+                        hook.onCanceled(jobID);
+                        break;
+                    case FAILED:
+                        hook.onFailed(jobID, cause);
+                        break;
+                    case FINISHED:
+                        hook.onFinished(jobID);
+                        break;
+                }
+            } catch (Throwable ignore) {
+                LOG.warn("Error while notifying JobStatusHook[{}]", hook.getClass(), ignore);

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r918501911


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1544,6 +1552,30 @@ private void notifyJobStatusChange(JobStatus newState) {
         }
     }
 
+    private void notifyJobStatusHooks(JobStatus newState, Throwable cause) {
+        JobID jobID = jobInformation.getJobId();
+        for (JobStatusHook hook : jobStatusHooks) {
+            try {
+                switch (newState) {
+                    case CREATED:
+                        hook.onCreated(jobID);

Review Comment:
   This is not necessary, only the FAILED branch needs to use cause



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20223:
URL: https://github.com/apache/flink/pull/20223#issuecomment-1178818119

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "33ee73026502609977ee857f05d29da2c107627d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "33ee73026502609977ee857f05d29da2c107627d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 33ee73026502609977ee857f05d29da2c107627d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on PR #20223:
URL: https://github.com/apache/flink/pull/20223#issuecomment-1178808754

   @gaoyunhaii  please take a look, if you have time, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Tartarus0zm commented on pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
Tartarus0zm commented on PR #20223:
URL: https://github.com/apache/flink/pull/20223#issuecomment-1182905270

   @gaoyunhaii  @lsyldliu  I have addressed the comments left. Please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r916800623


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingJobStatusHook.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+/** {@link JobStatusHook} implementation for testing purposes. */
+public class TestingJobStatusHook implements JobStatusHook {
+
+    // Record job status changes.
+    private final List<String> jobStatus;
+
+    public TestingJobStatusHook(List<String> jobStatus) {
+        this.jobStatus = jobStatus;
+    }
+
+    @Override
+    public void onCreated(JobID jobId) {
+        jobStatus.add("Created");

Review Comment:
   Because this is a public testing mock class and not an internal class, we should consider making it more flexible. I suggest the following pattern for all onXXX method:
   ```
   private Consumer<JobID> onCreatedConsumer = (jobID) -> {}; 
   ```
   and set it by `setOnCreatedConsumer(Consumer<JobID> onCreatedConsumer)`
   the callback method will looks like here:
   ```
   public void onCreated(JobID jobId) {
           onCreatedConsumer.accept(jobId);
   }
   ```
   This will make the testing mock class more flexible and easy to expand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r916804257


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -1618,6 +1622,99 @@ public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Except
         }
     }
 
+    @Test
+    public void testJobStatusHookWithJobFailed() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        List<JobStatusHook> jobStatusHooks = new ArrayList<>();
+        List<String> jobStatusList = new LinkedList<>();
+        jobStatusHooks.add(new TestingJobStatusHook(jobStatusList));
+        jobGraph.setJobStatusHooks(jobStatusHooks);
+
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
+
+        final ArchivedExecutionVertex onlyExecutionVertex =
+                Iterables.getOnlyElement(
+                        scheduler
+                                .requestJob()
+                                .getArchivedExecutionGraph()
+                                .getAllExecutionVertices());
+        final ExecutionAttemptID attemptId =
+                onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
+
+        scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId));
+
+        taskRestartExecutor.triggerScheduledTasks();
+
+        waitForTermination(scheduler);
+        final JobStatus jobStatus = scheduler.requestJobStatus();
+        assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));

Review Comment:
   New test method should avoid using hamcrest



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1544,6 +1552,30 @@ private void notifyJobStatusChange(JobStatus newState) {
         }
     }
 
+    private void notifyJobStatusHooks(JobStatus newState, Throwable cause) {

Review Comment:
   ```suggestion
       private void notifyJobStatusHooks(JobStatus newState, @Nullable Throwable cause) {
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1034,4 +1037,18 @@ public void setVertexNameIncludeIndexPrefix(boolean includePrefix) {
     public boolean isVertexNameIncludeIndexPrefix() {
         return this.vertexNameIncludeIndexPrefix;
     }
+
+    /** Registers the JobStatusHook. */
+    public void registerJobStatusHook(JobStatusHook hook) {
+        if (hook == null) {
+            throw new IllegalArgumentException();
+        }

Review Comment:
   ```suggestion
           checkNotNull(hook, "Registering a null JobStatusHook is not allowed. ")
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -225,6 +225,8 @@ private JobGraph createJobGraph() {
 
         setVertexDescription();
 
+        jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());

Review Comment:
   +1



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -97,6 +99,7 @@
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
+import org.assertj.core.util.Lists;

Review Comment:
   Why use assertj's Lists, but don't use assertj as assertion. In addition, it's best to use assertj for all newly introduced tests.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingJobStatusHook.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+/** {@link JobStatusHook} implementation for testing purposes. */
+public class TestingJobStatusHook implements JobStatusHook {
+
+    // Record job status changes.
+    private final List<String> jobStatus;
+
+    public TestingJobStatusHook(List<String> jobStatus) {
+        this.jobStatus = jobStatus;
+    }
+
+    @Override
+    public void onCreated(JobID jobId) {
+        jobStatus.add("Created");

Review Comment:
   Because this is a public testing mock class and not an internal class, we should consider making it more flexible. I suggest the following pattern for all onXXX method:
   ```
   private Consumer<JobID> onCreatedConsumer = (jobID) -> {}; 
   ```
   and set it by `setOnCreatedConsumer(Consumer<JobID> onCreatedConsumer)`
   ```
   the callback method will looks like here:
   public void onCreated(JobID jobId) {
           onCreatedConsumer.accept(jobId);
   }
   ```
   This will make the testing mock class more flexible and easy to expand.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1544,6 +1552,30 @@ private void notifyJobStatusChange(JobStatus newState) {
         }
     }
 
+    private void notifyJobStatusHooks(JobStatus newState, Throwable cause) {
+        JobID jobID = jobInformation.getJobId();
+        for (JobStatusHook hook : jobStatusHooks) {
+            try {
+                switch (newState) {
+                    case CREATED:
+                        hook.onCreated(jobID);

Review Comment:
   maybe we should check cause is null for the branch other than FAILED.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20223: [FLINK-28457][runtime] Introduce JobStatusHook

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20223:
URL: https://github.com/apache/flink/pull/20223#discussion_r916792016


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusHook.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.io.Serializable;
+
+/** Hooks provided by users on job status changing. */
+@Internal
+public interface JobStatusHook extends Serializable {
+
+    /** When Job become CREATED status. It would only be called one time. */

Review Comment:
   ```suggestion
       /** When Job become {@link JobStatus#CREATED} status. It would only be called one time. */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org