You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:24 UTC

[26/30] flink git commit: [FLINK-7815] Remove grouping from MultipleJobsDetails

[FLINK-7815] Remove grouping from MultipleJobsDetails

With this commit the MultipleJobsDetails instance only contains a list of all jobs
which could be retrieved from the cluster. With this change it is the responsibility
of the web ui to group the jobs into running and finished jobs.

Adapt jobs.svc.coffee script to group list of jobs into running and finished jobs

This closes #4806.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/430fa7b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/430fa7b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/430fa7b0

Branch: refs/heads/master
Commit: 430fa7b0bd24b79a11c221f54f899b8d2bc4b47f
Parents: 8086e3b
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 3 18:27:06 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:44 2017 +0100

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  8 +--
 .../client/program/rest/RestClusterClient.java  | 13 +++--
 .../flink/client/program/ClusterClientTest.java |  4 +-
 .../program/rest/RestClusterClientTest.java     |  3 +-
 .../webmonitor/history/HistoryServerTest.java   |  2 +-
 .../app/scripts/modules/jobs/jobs.svc.coffee    | 23 ++++++---
 .../web-dashboard/web/js/hs/index.js            |  4 +-
 flink-runtime-web/web-dashboard/web/js/index.js |  4 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  2 +-
 .../webmonitor/MultipleJobsDetails.java         | 54 +++++---------------
 .../handler/legacy/JobsOverviewHandler.java     | 14 ++---
 .../handler/legacy/metrics/MetricFetcher.java   | 11 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |  9 ++--
 .../runtime/jobmanager/MemoryArchivist.scala    |  2 +-
 .../messages/WebMonitorMessagesTest.java        |  5 +-
 .../webmonitor/MultipleJobsDetailsTest.java     |  5 +-
 .../handler/legacy/JobsOverviewHandlerTest.java |  9 ++--
 .../legacy/metrics/MetricFetcherTest.java       |  2 +-
 18 files changed, 73 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 62efcfa..5b30223 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -56,6 +56,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
@@ -705,9 +706,10 @@ public abstract class ClusterClient {
 		return responseFuture.thenApply((responseMessage) -> {
 			if (responseMessage instanceof MultipleJobsDetails) {
 				MultipleJobsDetails details = (MultipleJobsDetails) responseMessage;
-				Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(details.getRunning().size() + details.getFinished().size());
-				details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
-				details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+
+				final Collection<JobDetails> jobDetails = details.getJobs();
+				Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
+				jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
 				return flattenedDetails;
 			} else {
 				throw new CompletionException(

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 012bedc..e21e94b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
@@ -206,11 +207,13 @@ public class RestClusterClient extends ClusterClient {
 			headers
 		);
 		return jobDetailsFuture
-			.thenApply(details -> {
-				Collection<JobStatusMessage> flattenedDetails = new ArrayList<>();
-				details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
-				details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
-				return flattenedDetails;
+			.thenApply(
+				(MultipleJobsDetails multipleJobsDetails) -> {
+					final Collection<JobDetails> jobDetails = multipleJobsDetails.getJobs();
+					Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size());
+					jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+
+					return flattenedDetails;
 			});
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index ec73d76..e696769 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -35,8 +35,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 
@@ -257,7 +257,7 @@ public class ClusterClientTest extends TestLogger {
 		public MultipleJobsDetails process(RequestJobDetails message) {
 			JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
 			JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
-			return new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished));
+			return new MultipleJobsDetails(Arrays.asList(running, finished));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 7e8185c..0542d50 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -68,6 +68,7 @@ import org.junit.Test;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -314,7 +315,7 @@ public class RestClusterClientTest extends TestLogger {
 		protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
 			JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
 			JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
-			return CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished)));
+			return CompletableFuture.completedFuture(new MultipleJobsDetails(Arrays.asList(running, finished)));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index b270ca7..de63b43 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -96,7 +96,7 @@ public class HistoryServerTest extends TestLogger {
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
 			JsonNode overview = mapper.readTree(response);
 
-			String jobID = overview.get("finished").get(0).get("jid").asText();
+			String jobID = overview.get("jobs").get(0).get("jid").asText();
 			JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID));
 			Assert.assertNotNull(jobDetails.get("jid"));
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
index f07d9c0..5441730 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee
@@ -88,12 +88,23 @@ angular.module('flinkApp')
 
     $http.get flinkConfig.jobServer + "jobs/overview"
     .success (data, status, headers, config) =>
-      angular.forEach data, (list, listKey) =>
-        switch listKey
-          when 'running' then jobs.running = @setEndTimes(list)
-          when 'finished' then jobs.finished = @setEndTimes(list)
-          when 'cancelled' then jobs.cancelled = @setEndTimes(list)
-          when 'failed' then jobs.failed = @setEndTimes(list)
+      # reset job fields
+      jobs.finished = []
+      jobs.running = []
+
+      # group the received list of jobs into running and finished jobs
+      _(data.jobs).groupBy(
+        (x) ->
+          switch x.state.toLowerCase()
+            when 'finished' then 'finished'
+            when 'failed' then 'finished'
+            when 'canceled' then 'finished'
+            else 'running')
+      .forEach((value, key) =>
+        switch key
+          when 'finished' then jobs.finished = @setEndTimes(value)
+          when 'running' then jobs.running = @setEndTimes(value))
+      .value(); # materialize the chain
 
       deferred.resolve(jobs)
       notifyObservers()