You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:21 UTC

[23/30] flink git commit: [FLINK-7815] Remove grouping from MultipleJobsDetails

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 51d7755..8e33421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -343,7 +343,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		return combinedJobDetails.thenApply(
 			(Collection<JobDetails> jobDetails) ->
-				new MultipleJobsDetails(jobDetails, null));
+				new MultipleJobsDetails(jobDetails));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
index fcb62f6..3ab9b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
@@ -19,16 +19,15 @@
 package org.apache.flink.runtime.messages.webmonitor;
 
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.apache.commons.collections.CollectionUtils;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Objects;
 
 /**
  * An actor messages describing details of various jobs. This message is sent for example
@@ -38,38 +37,28 @@ public class MultipleJobsDetails implements ResponseBody, Serializable {
 
 	private static final long serialVersionUID = -1526236139616019127L;
 	
-	public static final String FIELD_NAME_JOBS_RUNNING = "running";
-	public static final String FIELD_NAME_JOBS_FINISHED = "finished";
+	public static final String FIELD_NAME_JOBS = "jobs";
 
-	@JsonProperty(FIELD_NAME_JOBS_RUNNING)
-	private final Collection<JobDetails> running;
-
-	@JsonProperty(FIELD_NAME_JOBS_FINISHED)
-	private final Collection<JobDetails> finished;
+	@JsonProperty(FIELD_NAME_JOBS)
+	private final Collection<JobDetails> jobs;
 
 	@JsonCreator
 	public MultipleJobsDetails(
-			@JsonProperty(FIELD_NAME_JOBS_RUNNING) Collection<JobDetails> running,
-			@JsonProperty(FIELD_NAME_JOBS_FINISHED) Collection<JobDetails> finished) {
-		this.running = running == null ? Collections.emptyList() : running;
-		this.finished = finished == null ? Collections.emptyList() : finished;
+			@JsonProperty(FIELD_NAME_JOBS) Collection<JobDetails> jobs) {
+		this.jobs = Preconditions.checkNotNull(jobs);
 	}
 	
 	// ------------------------------------------------------------------------
 
-	public Collection<JobDetails> getRunning() {
-		return running;
-	}
 
-	public Collection<JobDetails> getFinished() {
-		return finished;
+	public Collection<JobDetails> getJobs() {
+		return jobs;
 	}
 
 	@Override
 	public String toString() {
 		return "MultipleJobsDetails{" +
-			"running=" + running +
-			", finished=" + finished +
+			"jobs=" + jobs +
 			'}';
 	}
 
@@ -82,31 +71,12 @@ public class MultipleJobsDetails implements ResponseBody, Serializable {
 			return false;
 		}
 		MultipleJobsDetails that = (MultipleJobsDetails) o;
-
-		return CollectionUtils.isEqualCollection(running, that.running) &&
-			CollectionUtils.isEqualCollection(finished, that.finished);
+		return Objects.equals(jobs, that.jobs);
 	}
 
 	@Override
 	public int hashCode() {
-		// the hash code only depends on the collection elements, not the collection itself!
-		int result = 1;
-
-		Iterator<JobDetails> iterator = running.iterator();
-
-		while (iterator.hasNext()) {
-			JobDetails jobDetails = iterator.next();
-			result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
-		}
-
-		iterator = finished.iterator();
-
-		while (iterator.hasNext()) {
-			JobDetails jobDetails = iterator.next();
-			result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
-		}
-
-		return result;
+		return Objects.hash(jobs);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
index 6aa75b2..d2b4a6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
@@ -80,14 +80,8 @@ public class JobsOverviewHandler extends AbstractJsonRequestHandler {
 
 						gen.writeStartObject();
 
-						gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
-						for (JobDetails detail : result.getRunning()) {
-							jobDetailsSerializer.serialize(detail, gen, null);
-						}
-						gen.writeEndArray();
-
-						gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
-						for (JobDetails detail : result.getFinished()) {
+						gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);
+						for (JobDetails detail : result.getJobs()) {
 							jobDetailsSerializer.serialize(detail, gen, null);
 						}
 						gen.writeEndArray();
@@ -116,9 +110,7 @@ public class JobsOverviewHandler extends AbstractJsonRequestHandler {
 			StringWriter writer = new StringWriter();
 			try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
 				gen.writeStartObject();
-				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
-				gen.writeEndArray();
-				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+				gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);
 
 				final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
 				jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index e89ed55..313232f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -113,14 +113,11 @@ public class MetricFetcher<T extends RestfulGateway> {
 						if (throwable != null) {
 							LOG.debug("Fetching of JobDetails failed.", throwable);
 						} else {
-							ArrayList<String> activeJobs = new ArrayList<>();
-							for (JobDetails job : jobDetails.getRunning()) {
-								activeJobs.add(job.getJobId().toString());
+							ArrayList<String> toRetain = new ArrayList<>(jobDetails.getJobs().size());
+							for (JobDetails job : jobDetails.getJobs()) {
+								toRetain.add(job.getJobId().toString());
 							}
-							for (JobDetails job : jobDetails.getFinished()) {
-								activeJobs.add(job.getJobId().toString());
-							}
-							metrics.retainJobs(activeJobs);
+							metrics.retainJobs(toRetain);
 						}
 					},
 					executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4fb1196..95a3fd5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1649,10 +1649,10 @@ class JobManager(
 
         case msg : RequestJobDetails => 
           
-          val ourDetails: Array[JobDetails] = if (msg.shouldIncludeRunning()) {
+          val ourDetails: List[JobDetails] = if (msg.shouldIncludeRunning()) {
             currentJobs.values.map {
               v => WebMonitorUtils.createDetailsForJob(v._1)
-            }.toArray[JobDetails]
+            }.toList
           } else {
             null
           }
@@ -1662,11 +1662,10 @@ class JobManager(
             future.onSuccess {
               case archiveDetails: MultipleJobsDetails =>
                 theSender ! new MultipleJobsDetails(
-                  util.Arrays.asList(ourDetails: _*),
-                  archiveDetails.getFinished())
+                  (ourDetails ++ archiveDetails.getJobs.asScala).asJavaCollection)
             }(context.dispatcher)
           } else {
-            theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*), null)
+            theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*))
           }
           
         case _ => log.error("Unrecognized info message " + actorMessage)

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 7a10d01..3885596 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -189,7 +189,7 @@ class MemoryArchivist(
           v => WebMonitorUtils.createDetailsForJob(v)
         }.toArray[JobDetails]
 
-        theSender ! decorateMessage(new MultipleJobsDetails(null, util.Arrays.asList(details: _*)))
+        theSender ! decorateMessage(new MultipleJobsDetails(util.Arrays.asList(details: _*)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index b11ff6a..673fa49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -38,7 +39,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
 
 public class WebMonitorMessagesTest {
 	
@@ -103,7 +104,7 @@ public class WebMonitorMessagesTest {
 		try {
 			final Random rnd = new Random();
 			GenericMessageTester.testMessageInstance(
-					new MultipleJobsDetails(randomJobDetails(rnd), randomJobDetails(rnd)));
+					new MultipleJobsDetails(randomJobDetails(rnd)));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
index 297575a..829fe01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 
 import org.junit.Test;
 
-import java.util.Collections;
+import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
 
@@ -73,8 +73,7 @@ public class MultipleJobsDetailsTest extends TestLogger {
 			4);
 
 		final MultipleJobsDetails expected = new MultipleJobsDetails(
-			Collections.singleton(running),
-			Collections.singleton(finished));
+			Arrays.asList(running, finished));
 
 		final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
index ea40376..57995dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
@@ -59,13 +59,10 @@ public class JobsOverviewHandlerTest extends TestLogger {
 		Assert.assertEquals(JobsOverviewHeaders.URL, archive.getPath());
 
 		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
-		ArrayNode running = (ArrayNode) result.get("running");
-		Assert.assertEquals(0, running.size());
+		ArrayNode jobs = (ArrayNode) result.get("jobs");
+		Assert.assertEquals(1, jobs.size());
 
-		ArrayNode finished = (ArrayNode) result.get("finished");
-		Assert.assertEquals(1, finished.size());
-
-		compareJobOverview(expectedDetails, finished.get(0).toString());
+		compareJobOverview(expectedDetails, jobs.get(0).toString());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index e157b0a..72d18b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -82,7 +82,7 @@ public class MetricFetcherTest extends TestLogger {
 		JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
 
 		when(jobManagerGateway.requestJobDetails(any(Time.class)))
-			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
+			.thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
 		when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
 			CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
 		when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(