You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:21 UTC
[23/30] flink git commit: [FLINK-7815] Remove grouping from
MultipleJobsDetails
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 51d7755..8e33421 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -343,7 +343,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return combinedJobDetails.thenApply(
(Collection<JobDetails> jobDetails) ->
- new MultipleJobsDetails(jobDetails, null));
+ new MultipleJobsDetails(jobDetails));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
index fcb62f6..3ab9b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetails.java
@@ -19,16 +19,15 @@
package org.apache.flink.runtime.messages.webmonitor;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.commons.collections.CollectionUtils;
import java.io.Serializable;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Objects;
/**
* An actor messages describing details of various jobs. This message is sent for example
@@ -38,38 +37,28 @@ public class MultipleJobsDetails implements ResponseBody, Serializable {
private static final long serialVersionUID = -1526236139616019127L;
- public static final String FIELD_NAME_JOBS_RUNNING = "running";
- public static final String FIELD_NAME_JOBS_FINISHED = "finished";
+ public static final String FIELD_NAME_JOBS = "jobs";
- @JsonProperty(FIELD_NAME_JOBS_RUNNING)
- private final Collection<JobDetails> running;
-
- @JsonProperty(FIELD_NAME_JOBS_FINISHED)
- private final Collection<JobDetails> finished;
+ @JsonProperty(FIELD_NAME_JOBS)
+ private final Collection<JobDetails> jobs;
@JsonCreator
public MultipleJobsDetails(
- @JsonProperty(FIELD_NAME_JOBS_RUNNING) Collection<JobDetails> running,
- @JsonProperty(FIELD_NAME_JOBS_FINISHED) Collection<JobDetails> finished) {
- this.running = running == null ? Collections.emptyList() : running;
- this.finished = finished == null ? Collections.emptyList() : finished;
+ @JsonProperty(FIELD_NAME_JOBS) Collection<JobDetails> jobs) {
+ this.jobs = Preconditions.checkNotNull(jobs);
}
// ------------------------------------------------------------------------
- public Collection<JobDetails> getRunning() {
- return running;
- }
- public Collection<JobDetails> getFinished() {
- return finished;
+ public Collection<JobDetails> getJobs() {
+ return jobs;
}
@Override
public String toString() {
return "MultipleJobsDetails{" +
- "running=" + running +
- ", finished=" + finished +
+ "jobs=" + jobs +
'}';
}
@@ -82,31 +71,12 @@ public class MultipleJobsDetails implements ResponseBody, Serializable {
return false;
}
MultipleJobsDetails that = (MultipleJobsDetails) o;
-
- return CollectionUtils.isEqualCollection(running, that.running) &&
- CollectionUtils.isEqualCollection(finished, that.finished);
+ return Objects.equals(jobs, that.jobs);
}
@Override
public int hashCode() {
- // the hash code only depends on the collection elements, not the collection itself!
- int result = 1;
-
- Iterator<JobDetails> iterator = running.iterator();
-
- while (iterator.hasNext()) {
- JobDetails jobDetails = iterator.next();
- result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
- }
-
- iterator = finished.iterator();
-
- while (iterator.hasNext()) {
- JobDetails jobDetails = iterator.next();
- result = 31 * result + (jobDetails == null ? 0 : jobDetails.hashCode());
- }
-
- return result;
+ return Objects.hash(jobs);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
index 6aa75b2..d2b4a6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandler.java
@@ -80,14 +80,8 @@ public class JobsOverviewHandler extends AbstractJsonRequestHandler {
gen.writeStartObject();
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
- for (JobDetails detail : result.getRunning()) {
- jobDetailsSerializer.serialize(detail, gen, null);
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
- for (JobDetails detail : result.getFinished()) {
+ gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);
+ for (JobDetails detail : result.getJobs()) {
jobDetailsSerializer.serialize(detail, gen, null);
}
gen.writeEndArray();
@@ -116,9 +110,7 @@ public class JobsOverviewHandler extends AbstractJsonRequestHandler {
StringWriter writer = new StringWriter();
try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
gen.writeStartObject();
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_RUNNING);
- gen.writeEndArray();
- gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS_FINISHED);
+ gen.writeArrayFieldStart(MultipleJobsDetails.FIELD_NAME_JOBS);
final JobDetails.JobDetailsSerializer jobDetailsSerializer = new JobDetails.JobDetailsSerializer();
jobDetailsSerializer.serialize(WebMonitorUtils.createDetailsForJob(graph), gen, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index e89ed55..313232f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -113,14 +113,11 @@ public class MetricFetcher<T extends RestfulGateway> {
if (throwable != null) {
LOG.debug("Fetching of JobDetails failed.", throwable);
} else {
- ArrayList<String> activeJobs = new ArrayList<>();
- for (JobDetails job : jobDetails.getRunning()) {
- activeJobs.add(job.getJobId().toString());
+ ArrayList<String> toRetain = new ArrayList<>(jobDetails.getJobs().size());
+ for (JobDetails job : jobDetails.getJobs()) {
+ toRetain.add(job.getJobId().toString());
}
- for (JobDetails job : jobDetails.getFinished()) {
- activeJobs.add(job.getJobId().toString());
- }
- metrics.retainJobs(activeJobs);
+ metrics.retainJobs(toRetain);
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4fb1196..95a3fd5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1649,10 +1649,10 @@ class JobManager(
case msg : RequestJobDetails =>
- val ourDetails: Array[JobDetails] = if (msg.shouldIncludeRunning()) {
+ val ourDetails: List[JobDetails] = if (msg.shouldIncludeRunning()) {
currentJobs.values.map {
v => WebMonitorUtils.createDetailsForJob(v._1)
- }.toArray[JobDetails]
+ }.toList
} else {
null
}
@@ -1662,11 +1662,10 @@ class JobManager(
future.onSuccess {
case archiveDetails: MultipleJobsDetails =>
theSender ! new MultipleJobsDetails(
- util.Arrays.asList(ourDetails: _*),
- archiveDetails.getFinished())
+ (ourDetails ++ archiveDetails.getJobs.asScala).asJavaCollection)
}(context.dispatcher)
} else {
- theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*), null)
+ theSender ! new MultipleJobsDetails(util.Arrays.asList(ourDetails: _*))
}
case _ => log.error("Unrecognized info message " + actorMessage)
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 7a10d01..3885596 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -189,7 +189,7 @@ class MemoryArchivist(
v => WebMonitorUtils.createDetailsForJob(v)
}.toArray[JobDetails]
- theSender ! decorateMessage(new MultipleJobsDetails(null, util.Arrays.asList(details: _*)))
+ theSender ! decorateMessage(new MultipleJobsDetails(util.Arrays.asList(details: _*)))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
index b11ff6a..673fa49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/WebMonitorMessagesTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.webmonitor.RequestJobsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+
import org.junit.Test;
import java.util.ArrayList;
@@ -38,7 +39,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
public class WebMonitorMessagesTest {
@@ -103,7 +104,7 @@ public class WebMonitorMessagesTest {
try {
final Random rnd = new Random();
GenericMessageTester.testMessageInstance(
- new MultipleJobsDetails(randomJobDetails(rnd), randomJobDetails(rnd)));
+ new MultipleJobsDetails(randomJobDetails(rnd)));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
index 297575a..829fe01 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/MultipleJobsDetailsTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.junit.Test;
-import java.util.Collections;
+import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@@ -73,8 +73,7 @@ public class MultipleJobsDetailsTest extends TestLogger {
4);
final MultipleJobsDetails expected = new MultipleJobsDetails(
- Collections.singleton(running),
- Collections.singleton(finished));
+ Arrays.asList(running, finished));
final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
index ea40376..57995dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobsOverviewHandlerTest.java
@@ -59,13 +59,10 @@ public class JobsOverviewHandlerTest extends TestLogger {
Assert.assertEquals(JobsOverviewHeaders.URL, archive.getPath());
JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(archive.getJson());
- ArrayNode running = (ArrayNode) result.get("running");
- Assert.assertEquals(0, running.size());
+ ArrayNode jobs = (ArrayNode) result.get("jobs");
+ Assert.assertEquals(1, jobs.size());
- ArrayNode finished = (ArrayNode) result.get("finished");
- Assert.assertEquals(1, finished.size());
-
- compareJobOverview(expectedDetails, finished.get(0).toString());
+ compareJobOverview(expectedDetails, jobs.get(0).toString());
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/430fa7b0/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index e157b0a..72d18b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -82,7 +82,7 @@ public class MetricFetcherTest extends TestLogger {
JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class);
when(jobManagerGateway.requestJobDetails(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList(), Collections.emptyList())));
+ .thenReturn(CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.emptyList())));
when(jobManagerGateway.requestMetricQueryServicePaths(any(Time.class))).thenReturn(
CompletableFuture.completedFuture(Collections.singleton(jmMetricQueryServicePath)));
when(jobManagerGateway.requestTaskManagerMetricQueryServicePaths(any(Time.class))).thenReturn(