You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/09 14:50:40 UTC

[GitHub] [flink] XComp commented on a change in pull request #13560: [FLINK-19518] Show proper job duration for running jobs in web ui

XComp commented on a change in pull request #13560:
URL: https://github.com/apache/flink/pull/13560#discussion_r502259949



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
##########
@@ -98,6 +101,42 @@ public JobDetails(
 		this.tasksPerState = checkNotNull(tasksPerState);
 		this.numTasks = numTasks;
 	}
+
+	public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
+		JobStatus status = job.getState();
+
+		long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
+		long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
+		long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started;
+
+		int[] countsPerStatus = new int[ExecutionState.values().length];
+		long lastChanged = 0;
+		int numTotalTasks = 0;
+
+		for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
+			AccessExecutionVertex[] vertices = ejv.getTaskVertices();

Review comment:
       ```suggestion
   			AccessExecutionVertex[] taskVertices = ejv.getTaskVertices();
   ```
   Could we do this renaming to improve the readability since we're dealing with different types of vertices in this code segment?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java
##########
@@ -98,6 +101,42 @@ public JobDetails(
 		this.tasksPerState = checkNotNull(tasksPerState);
 		this.numTasks = numTasks;
 	}
+
+	public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
+		JobStatus status = job.getState();
+
+		long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
+		long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
+		long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started;
+
+		int[] countsPerStatus = new int[ExecutionState.values().length];
+		long lastChanged = 0;
+		int numTotalTasks = 0;
+
+		for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
+			AccessExecutionVertex[] vertices = ejv.getTaskVertices();
+			numTotalTasks += vertices.length;
+
+			for (AccessExecutionVertex vertex : vertices) {

Review comment:
       ```suggestion
   			for (AccessExecutionVertex taskVertex : taskVertices) {
   ```
   Same here with the `vertex` variable.

##########
File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
##########
@@ -303,6 +303,55 @@ public void testCancel() throws Exception {
 		BlockingInvokable.reset();
 	}
 
+	/**
+	 * See FLINK-19518. This test ensures that the /jobs/overview handler shows a duration != 0.
+	 *
+	 */
+	@Test
+	public void testJobOverviewHandler() throws Exception {
+		// this only works if there is no active job at this point
+		assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
+
+		// Create a task
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setParallelism(2);
+		sender.setInvokableClass(BlockingInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+		final JobID jid = jobGraph.getJobID();
+
+		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		clusterClient.submitJob(jobGraph).get();
+
+		// wait for job to show up
+		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
+			Thread.sleep(10);
+		}
+
+		// wait for tasks to be properly running
+		BlockingInvokable.latch.await();
+
+		final Duration testTimeout = Duration.ofMinutes(2);
+		final LocalTime deadline = LocalTime.now().plus(testTimeout);

Review comment:
       The `deadline` variable is never used. Are we missing an `assert` in this test or is this variable obsolete?

##########
File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
##########
@@ -303,6 +303,55 @@ public void testCancel() throws Exception {
 		BlockingInvokable.reset();
 	}
 
+	/**
+	 * See FLINK-19518. This test ensures that the /jobs/overview handler shows a duration != 0.
+	 *

Review comment:
       I know it's a minor thing, but the extra line is not necessary here.

##########
File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
##########
@@ -303,6 +303,55 @@ public void testCancel() throws Exception {
 		BlockingInvokable.reset();
 	}
 
+	/**
+	 * See FLINK-19518. This test ensures that the /jobs/overview handler shows a duration != 0.
+	 *
+	 */
+	@Test
+	public void testJobOverviewHandler() throws Exception {
+		// this only works if there is no active job at this point
+		assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
+
+		// Create a task
+		final JobVertex sender = new JobVertex("Sender");
+		sender.setParallelism(2);
+		sender.setInvokableClass(BlockingInvokable.class);
+
+		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
+		final JobID jid = jobGraph.getJobID();

Review comment:
       `jid` is never used and can be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org