You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/25 20:15:14 UTC
[1/6] flink git commit: [FLINK-8266] Add network memory to
ResourceProfile for input/output memory of a task
Repository: flink
Updated Branches:
refs/heads/master c1734f4bf -> 37b4e2cef
[FLINK-8266] Add network memory to ResourceProfile for input/output memory of a task
This closes #5170.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47e6069d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47e6069d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47e6069d
Branch: refs/heads/master
Commit: 47e6069d7a299c02a81f062a7acb6a792b71c146
Parents: a4ecc7f
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Fri Dec 15 18:43:27 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:29 2018 +0100
----------------------------------------------------------------------
.../clusterframework/types/ResourceProfile.java | 36 +++++++++++++++---
.../types/ResourceProfileTest.java | 40 ++++++++++----------
.../flink/yarn/YarnResourceManagerTest.java | 2 +-
3 files changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 6eb9af4..87d6fc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -64,6 +64,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
/** How many native memory in mb are needed. */
private final int nativeMemoryInMB;
+ /** Memory used for the task in the slot to communicate with its upstreams. Set by job master. */
+ private final int networkMemoryInMB;
+
/** A extensible field for user specified resources from {@link ResourceSpec}. */
private final Map<String, Resource> extendedResources = new HashMap<>(1);
@@ -76,6 +79,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param heapMemoryInMB The size of the heap memory, in megabytes.
* @param directMemoryInMB The size of the direct memory, in megabytes.
* @param nativeMemoryInMB The size of the native memory, in megabytes.
+ * @param networkMemoryInMB The size of the memory for input and output, in megabytes.
* @param extendedResources The extended resources such as GPU and FPGA
*/
public ResourceProfile(
@@ -83,11 +87,13 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
int heapMemoryInMB,
int directMemoryInMB,
int nativeMemoryInMB,
+ int networkMemoryInMB,
Map<String, Resource> extendedResources) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
this.nativeMemoryInMB = nativeMemoryInMB;
+ this.networkMemoryInMB = networkMemoryInMB;
if (extendedResources != null) {
this.extendedResources.putAll(extendedResources);
}
@@ -100,7 +106,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param heapMemoryInMB The size of the heap memory, in megabytes.
*/
public ResourceProfile(double cpuCores, int heapMemoryInMB) {
- this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP);
+ this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.EMPTY_MAP);
}
/**
@@ -109,7 +115,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
* @param other The ResourceProfile to copy.
*/
public ResourceProfile(ResourceProfile other) {
- this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
+ this(other.cpuCores,
+ other.heapMemoryInMB,
+ other.directMemoryInMB,
+ other.nativeMemoryInMB,
+ other.networkMemoryInMB,
+ other.extendedResources);
}
// ------------------------------------------------------------------------
@@ -151,12 +162,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
}
/**
+ * Get the memory needed for task to communicate with its upstreams and downstreams in MB.
+ * @return The network memory in MB
+ */
+ public int getNetworkMemoryInMB() {
+ return networkMemoryInMB;
+ }
+
+ /**
* Get the total memory needed in MB.
*
* @return The total memory in MB
*/
public int getMemoryInMB() {
- return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+ return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB;
}
/**
@@ -187,7 +206,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
if (cpuCores >= required.getCpuCores() &&
heapMemoryInMB >= required.getHeapMemoryInMB() &&
directMemoryInMB >= required.getDirectMemoryInMB() &&
- nativeMemoryInMB >= required.getNativeMemoryInMB()) {
+ nativeMemoryInMB >= required.getNativeMemoryInMB() &&
+ networkMemoryInMB >= required.getNetworkMemoryInMB()) {
for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
if (!extendedResources.containsKey(resource.getKey()) ||
!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
@@ -241,6 +261,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
result = 31 * result + heapMemoryInMB;
result = 31 * result + directMemoryInMB;
result = 31 * result + nativeMemoryInMB;
+ result = 31 * result + networkMemoryInMB;
result = 31 * result + extendedResources.hashCode();
return result;
}
@@ -255,6 +276,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
return this.cpuCores == that.cpuCores &&
this.heapMemoryInMB == that.heapMemoryInMB &&
this.directMemoryInMB == that.directMemoryInMB &&
+ this.networkMemoryInMB == that.networkMemoryInMB &&
Objects.equals(extendedResources, that.extendedResources);
}
return false;
@@ -270,11 +292,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
- ", nativeMemoryInMB=" + nativeMemoryInMB + resources +
+ ", nativeMemoryInMB=" + nativeMemoryInMB +
+ ", networkMemoryInMB=" + networkMemoryInMB + resources +
'}';
}
- static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) {
+ static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
return new ResourceProfile(
@@ -282,6 +305,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
resourceSpec.getHeapMemory(),
resourceSpec.getDirectMemory(),
resourceSpec.getNativeMemory(),
+ networkMemory,
copiedExtendedResources);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 25cb5fb..7ed688a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
@Test
public void testMatchRequirement() throws Exception {
- ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, Collections.EMPTY_MAP);
- ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, Collections.EMPTY_MAP);
- ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, Collections.EMPTY_MAP);
- ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, Collections.EMPTY_MAP);
+ ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
+ ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
+ ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
+ ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
assertFalse(rp1.isMatching(rp2));
assertTrue(rp2.isMatching(rp1));
@@ -50,6 +50,9 @@ public class ResourceProfileTest {
assertTrue(rp4.isMatching(rp3));
assertTrue(rp4.isMatching(rp4));
+ ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, null);
+ assertFalse(rp4.isMatching(rp5));
+
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
@@ -61,10 +64,9 @@ public class ResourceProfileTest {
setGPUResource(1.1).
build();
-
- assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1)));
- assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2)));
- assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1)));
+ assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1, 0)));
+ assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).isMatching(ResourceProfile.fromResourceSpec(rs2, 0)));
+ assertFalse(ResourceProfile.fromResourceSpec(rs2, 0).isMatching(ResourceProfile.fromResourceSpec(rs1, 0)));
}
@Test
@@ -76,7 +78,7 @@ public class ResourceProfileTest {
public void testEquals() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
- assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2)));
+ assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).equals(ResourceProfile.fromResourceSpec(rs2, 0)));
ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
@@ -88,37 +90,37 @@ public class ResourceProfileTest {
setHeapMemoryInMB(100).
setGPUResource(1.1).
build();
- assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4)));
+ assertFalse(ResourceProfile.fromResourceSpec(rs3, 0).equals(ResourceProfile.fromResourceSpec(rs4, 0)));
ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setGPUResource(2.2).
build();
- assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5)));
+ assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
}
@Test
public void testCompareTo() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
- assertEquals(0, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2)));
+ assertEquals(0, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs2, 0)));
ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setGPUResource(2.2).
build();
- assertEquals(-1, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3)));
- assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1)));
+ assertEquals(-1, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
+ assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs1, 0)));
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setGPUResource(1.1).
build();
- assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4)));
- assertEquals(-1, ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+ assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs4, 0)));
+ assertEquals(-1, ResourceProfile.fromResourceSpec(rs4, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
ResourceSpec rs5 = ResourceSpec.newBuilder().
@@ -126,7 +128,7 @@ public class ResourceProfileTest {
setHeapMemoryInMB(100).
setGPUResource(2.2).
build();
- assertEquals(0, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5)));
+ assertEquals(0, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs5, 0)));
}
@Test
@@ -136,10 +138,10 @@ public class ResourceProfileTest {
setHeapMemoryInMB(100).
setGPUResource(1.6).
build();
- ResourceProfile rp = ResourceProfile.fromResourceSpec(rs);
+ ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, 50);
assertEquals(1.0, rp.getCpuCores(), 0.000001);
- assertEquals(100, rp.getMemoryInMB());
+ assertEquals(150, rp.getMemoryInMB());
assertEquals(100, rp.getOperatorsMemoryInMB());
assertEquals(1.6, rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index cd08fb9..1e70169 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -347,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger {
final SlotReport slotReport = new SlotReport(
new SlotStatus(
new SlotID(taskManagerResourceId, 1),
- new ResourceProfile(10, 1, 1, 1, Collections.emptyMap())));
+ new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
.registerTaskExecutor(
[3/6] flink git commit: [hotfix] [qs] Combine logging statements in
QueryableStateUtils#createKvStateServer
Posted by tr...@apache.org.
[hotfix] [qs] Combine logging statements in QueryableStateUtils#createKvStateServer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5757942d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5757942d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5757942d
Branch: refs/heads/master
Commit: 5757942d598afa4b2df6a0beb8da38052dbbee10
Parents: a354a9a
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 10:56:35 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:30 2018 +0100
----------------------------------------------------------------------
.../runtime/query/QueryableStateUtils.java | 24 +++++++++++++-------
1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5757942d/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index adbe15d..c3f83447 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -73,10 +73,14 @@ public final class QueryableStateUtils {
KvStateRequestStats.class);
return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, stats);
} catch (ClassNotFoundException e) {
- LOG.warn("Could not load Queryable State Client Proxy. " +
- "Probable reason: flink-queryable-state-runtime is not in the classpath. " +
- "Please put the corresponding jar from the opt to the lib folder.");
- LOG.debug("Caught exception", e);
+ final String msg = "Could not load Queryable State Client Proxy. " +
+ "Probable reason: flink-queryable-state-runtime is not in the classpath. " +
+ "Please put the corresponding jar from the opt to the lib folder.";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ } else {
+ LOG.info(msg);
+ }
return null;
} catch (InvocationTargetException e) {
LOG.error("Queryable State Client Proxy could not be created: ", e.getTargetException());
@@ -128,10 +132,14 @@ public final class QueryableStateUtils {
KvStateRequestStats.class);
return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, kvStateRegistry, stats);
} catch (ClassNotFoundException e) {
- LOG.warn("Could not load Queryable State Server. " +
- "Probable reason: flink-queryable-state-runtime is not in the classpath. " +
- "Please put the corresponding jar from the opt to the lib folder.");
- LOG.debug("Caught exception", e);
+ final String msg = "Could not load Queryable State Server. " +
+ "Probable reason: flink-queryable-state-runtime is not in the classpath. " +
+ "Please put the corresponding jar from the opt to the lib folder.";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(msg, e);
+ } else {
+ LOG.info(msg);
+ }
return null;
} catch (InvocationTargetException e) {
LOG.error("Queryable State Server could not be created: ", e.getTargetException());
[5/6] flink git commit: [FLINK-7858][flip6] Port
JobVertexTaskManagersHandler to REST endpoint
Posted by tr...@apache.org.
[FLINK-7858][flip6] Port JobVertexTaskManagersHandler to REST endpoint
This closes #5149.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/056c72af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/056c72af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/056c72af
Branch: refs/heads/master
Commit: 056c72af994bc0b7bd838faff6b2991763fc2ac1
Parents: 5757942
Author: zjureel <zj...@gmail.com>
Authored: Tue Dec 19 16:56:50 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:30 2018 +0100
----------------------------------------------------------------------
.../job/JobVertexTaskManagersHandler.java | 165 ++++++++++++++++++
.../messages/JobVertexTaskManagersHeaders.java | 72 ++++++++
.../messages/JobVertexTaskManagersInfo.java | 171 +++++++++++++++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 13 ++
.../messages/JobVertexTaskManagersInfoTest.java | 65 +++++++
5 files changed, 486 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
new file mode 100644
index 0000000..9b59e8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex task managers.
+ */
+public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
+ private MetricFetcher<?> metricFetcher;
+
+ public JobVertexTaskManagersHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor,
+ MetricFetcher<?> metricFetcher) {
+ super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+ this.metricFetcher = metricFetcher;
+ }
+
+ @Override
+ protected JobVertexTaskManagersInfo handleRequest(
+ HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request,
+ AccessExecutionGraph executionGraph) throws RestHandlerException {
+ JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+ JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+ AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+
+ // Build a map that groups tasks by TaskManager
+ Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+ String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
+ List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
+ if (vertices == null) {
+ vertices = new ArrayList<>();
+ taskManagerVertices.put(taskManager, vertices);
+ }
+
+ vertices.add(vertex);
+ }
+
+ final long now = System.currentTimeMillis();
+
+ List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+ for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
+ String host = entry.getKey();
+ List<AccessExecutionVertex> taskVertices = entry.getValue();
+
+ int[] tasksPerState = new int[ExecutionState.values().length];
+
+ long startTime = Long.MAX_VALUE;
+ long endTime = 0;
+ boolean allFinished = true;
+
+ MutableIOMetrics counts = new MutableIOMetrics();
+
+ for (AccessExecutionVertex vertex : taskVertices) {
+ final ExecutionState state = vertex.getExecutionState();
+ tasksPerState[state.ordinal()]++;
+
+ // take the earliest start time
+ long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+ if (started > 0) {
+ startTime = Math.min(startTime, started);
+ }
+
+ allFinished &= state.isTerminal();
+ endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+
+ counts.addIOMetrics(
+ vertex.getCurrentExecutionAttempt(),
+ metricFetcher,
+ jobID.toString(),
+ jobVertex.getJobVertexId().toString());
+ }
+
+ long duration;
+ if (startTime < Long.MAX_VALUE) {
+ if (allFinished) {
+ duration = endTime - startTime;
+ }
+ else {
+ endTime = -1L;
+ duration = now - startTime;
+ }
+ }
+ else {
+ startTime = -1L;
+ endTime = -1L;
+ duration = -1L;
+ }
+
+ ExecutionState jobVertexState =
+ ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+ final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
+ counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
+ counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
+ counts.getNumBytesOut(),
+ counts.isNumBytesOutComplete(),
+ counts.getNumRecordsIn(),
+ counts.isNumRecordsInComplete(),
+ counts.getNumRecordsOut(),
+ counts.isNumRecordsOutComplete());
+
+ Map<ExecutionState, Integer> statusCounts = new HashMap<>();
+ for (ExecutionState state : ExecutionState.values()) {
+ statusCounts.put(state, tasksPerState[state.ordinal()]);
+ }
+ taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
+ }
+
+ return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
new file mode 100644
index 0000000..311d047
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexTaskManagersHandler}.
+ */
+public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> {
+
+ private static final JobVertexTaskManagersHeaders INSTANCE = new JobVertexTaskManagersHeaders();
+
+ public static final String URL = "/jobs" +
+ "/:" + JobIDPathParameter.KEY +
+ "/vertices" +
+ "/:" + JobVertexIdPathParameter.KEY +
+ "/taskmanagers";
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public Class<JobVertexTaskManagersInfo> getResponseClass() {
+ return JobVertexTaskManagersInfo.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public JobVertexMessageParameters getUnresolvedMessageParameters() {
+ return new JobVertexMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static JobVertexTaskManagersHeaders getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
new file mode 100644
index 0000000..fc30155
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexTaskManagersHandler}.
+ */
+public class JobVertexTaskManagersInfo implements ResponseBody {
+ public static final String VERTEX_TASK_FIELD_ID = "id";
+ public static final String VERTEX_TASK_FIELD_NAME = "name";
+ public static final String VERTEX_TASK_FIELD_NOW = "now";
+ public static final String VERTEX_TASK_FIELD_TASK_MANAGERS = "taskmanagers";
+
+ @JsonProperty(VERTEX_TASK_FIELD_ID)
+ @JsonSerialize(using = JobVertexIDSerializer.class)
+ private final JobVertexID jobVertexID;
+
+ @JsonProperty(VERTEX_TASK_FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(VERTEX_TASK_FIELD_NOW)
+ private final long now;
+
+ @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
+ private List<TaskManagersInfo> taskManagers;
+
+ @JsonCreator
+ public JobVertexTaskManagersInfo(
+ @JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
+ @JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
+ @JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
+ @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
+ this.jobVertexID = checkNotNull(jobVertexID);
+ this.name = checkNotNull(name);
+ this.now = now;
+ this.taskManagers = checkNotNull(taskManagers);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobVertexTaskManagersInfo that = (JobVertexTaskManagersInfo) o;
+ return Objects.equals(jobVertexID, that.jobVertexID) &&
+ Objects.equals(name, that.name) &&
+ now == that.now &&
+ Objects.equals(taskManagers, that.taskManagers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobVertexID, name, now, taskManagers);
+ }
+
+ // ---------------------------------------------------
+ // Static inner classes
+ // ---------------------------------------------------
+
+ /**
+ * Detailed information about task managers.
+ */
+ public static class TaskManagersInfo {
+ public static final String TASK_MANAGERS_FIELD_HOST = "host";
+ public static final String TASK_MANAGERS_FIELD_STATUS = "status";
+ public static final String TASK_MANAGERS_FIELD_START_TIME = "start-time";
+ public static final String TASK_MANAGERS_FIELD_END_TIME = "end-time";
+ public static final String TASK_MANAGERS_FIELD_DURATION = "duration";
+ public static final String TASK_MANAGERS_FIELD_METRICS = "metrics";
+ public static final String TASK_MANAGERS_FIELD_STATUS_COUNTS = "status-counts";
+
+ @JsonProperty(TASK_MANAGERS_FIELD_HOST)
+ private final String host;
+
+ @JsonProperty(TASK_MANAGERS_FIELD_STATUS)
+ private final ExecutionState status;
+
+ @JsonProperty(TASK_MANAGERS_FIELD_START_TIME)
+ private final long startTime;
+
+ @JsonProperty(TASK_MANAGERS_FIELD_END_TIME)
+ private final long endTime;
+
+ @JsonProperty(TASK_MANAGERS_FIELD_DURATION)
+ private final long duration;
+
+ @JsonProperty(TASK_MANAGERS_FIELD_METRICS)
+ private final IOMetricsInfo metrics;
+
+ @JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS)
+ private final Map<ExecutionState, Integer> statusCounts;
+
+ @JsonCreator
+ public TaskManagersInfo(
+ @JsonProperty(TASK_MANAGERS_FIELD_HOST) String host,
+ @JsonProperty(TASK_MANAGERS_FIELD_STATUS) ExecutionState status,
+ @JsonProperty(TASK_MANAGERS_FIELD_START_TIME) long startTime,
+ @JsonProperty(TASK_MANAGERS_FIELD_END_TIME) long endTime,
+ @JsonProperty(TASK_MANAGERS_FIELD_DURATION) long duration,
+ @JsonProperty(TASK_MANAGERS_FIELD_METRICS) IOMetricsInfo metrics,
+ @JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS) Map<ExecutionState, Integer> statusCounts) {
+ this.host = checkNotNull(host);
+ this.status = checkNotNull(status);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.duration = duration;
+ this.metrics = checkNotNull(metrics);
+ this.statusCounts = checkNotNull(statusCounts);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskManagersInfo that = (TaskManagersInfo) o;
+ return Objects.equals(host, that.host) &&
+ Objects.equals(status, that.status) &&
+ startTime == that.startTime &&
+ endTime == that.endTime &&
+ duration == that.duration &&
+ Objects.equals(metrics, that.metrics) &&
+ Objects.equals(statusCounts, that.statusCounts);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, status, startTime, endTime, duration, metrics, statusCounts);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index e432752..30a68d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
@@ -69,6 +70,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
@@ -350,6 +352,16 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
responseHeaders,
metricFetcher);
+ final JobVertexTaskManagersHandler jobVertexTaskManagersHandler = new JobVertexTaskManagersHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobVertexTaskManagersHeaders.getInstance(),
+ executionGraphCache,
+ executor,
+ metricFetcher);
+
final JobExecutionResultHandler jobExecutionResultHandler = new JobExecutionResultHandler(
restAddressFuture,
leaderRetriever,
@@ -446,6 +458,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler));
+ handlers.add(Tuple2.of(JobVertexTaskManagersHeaders.getInstance(), jobVertexTaskManagersHandler));
// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
optWebContent.ifPresent(
http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
new file mode 100644
index 0000000..1a7b521
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo.TaskManagersInfo;
+
+/**
+ * Tests that the {@link JobVertexTaskManagersInfo} can be marshalled and unmarshalled.
+ */
+public class JobVertexTaskManagersInfoTest extends RestResponseMarshallingTestBase<JobVertexTaskManagersInfo> {
+ @Override
+ protected Class<JobVertexTaskManagersInfo> getTestResponseClass() {
+ return JobVertexTaskManagersInfo.class;
+ }
+
+ @Override
+ protected JobVertexTaskManagersInfo getTestResponseInstance() throws Exception {
+ final Random random = new Random();
+ List<TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+
+ final Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
+ final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
+ random.nextLong(),
+ random.nextBoolean(),
+ random.nextLong(),
+ random.nextBoolean(),
+ random.nextLong(),
+ random.nextBoolean(),
+ random.nextLong(),
+ random.nextBoolean());
+ int count = 100;
+ for (ExecutionState executionState : ExecutionState.values()) {
+ statusCounts.put(executionState, count++);
+ }
+ taskManagersInfoList.add(new TaskManagersInfo("host1", ExecutionState.CANCELING, 1L, 2L, 3L, jobVertexMetrics, statusCounts));
+
+ return new JobVertexTaskManagersInfo(new JobVertexID(), "test", System.currentTimeMillis(), taskManagersInfoList);
+ }
+}
[6/6] flink git commit: [FLINK-7858][flip6] Return with HTTP 404 if
job or jobvertex are unknown
Posted by tr...@apache.org.
[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown
Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @Nullable.
Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is unknown.
Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown.
Copy Javadoc from legacy JobVertexTaskManagersHandler.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b4e2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b4e2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b4e2ce
Branch: refs/heads/master
Commit: 37b4e2cef687160f2bc7cedb7d2360825089569e
Parents: 056c72a
Author: gyao <ga...@data-artisans.com>
Authored: Wed Jan 24 12:24:35 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:55:54 2018 +0100
----------------------------------------------------------------------
.../executiongraph/AccessExecutionGraph.java | 3 +-
.../flink/runtime/rest/NotFoundException.java | 4 ++
.../job/AbstractExecutionGraphHandler.java | 15 ++++++-
.../job/JobVertexTaskManagersHandler.java | 41 +++++++++++++-------
.../messages/JobVertexTaskManagersHeaders.java | 2 +
.../messages/JobVertexTaskManagersInfo.java | 12 +++---
6 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 362afa1..8d1fa1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -76,8 +76,9 @@ public interface AccessExecutionGraph {
* Returns the job vertex for the given {@link JobVertexID}.
*
* @param id id of job vertex to be returned
- * @return job vertex for the given id, or null
+ * @return job vertex for the given id, or {@code null}
*/
+ @Nullable
AccessExecutionJobVertex getJobVertex(JobVertexID id);
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
index 50060b0..f9db334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException {
public NotFoundException(String message) {
super(message, HttpResponseStatus.NOT_FOUND);
}
+
+ public NotFoundException(String message, Throwable cause) {
+ super(message, HttpResponseStatus.NOT_FOUND, cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 7192832..7c42af1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
@@ -79,8 +82,16 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M ex
} catch (RestHandlerException rhe) {
throw new CompletionException(rhe);
}
- },
- executor);
+ }, executor)
+ .exceptionally(throwable -> {
+ throwable = ExceptionUtils.stripCompletionException(throwable);
+ if (throwable instanceof FlinkJobNotFoundException) {
+ throw new CompletionException(
+ new NotFoundException(String.format("Job %s not found", jobId), throwable));
+ } else {
+ throw new CompletionException(throwable);
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 9b59e8d..24650a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
@@ -50,7 +52,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
- * Request handler for the job vertex task managers.
+ * A request handler that provides the details of a job vertex, including id, name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
*/
public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
private MetricFetcher<?> metricFetcher;
@@ -65,7 +68,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
Executor executor,
MetricFetcher<?> metricFetcher) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
- this.metricFetcher = metricFetcher;
+ this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
}
@Override
@@ -76,23 +79,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+ if (jobVertex == null) {
+ throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
+ }
+
// Build a map that groups tasks by TaskManager
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
- String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
- List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
- if (vertices == null) {
- vertices = new ArrayList<>();
- taskManagerVertices.put(taskManager, vertices);
- }
-
+ String taskManager = location == null ? "(unassigned)" : location.getHostname() + ':' + location.dataPort();
+ List<AccessExecutionVertex> vertices = taskManagerVertices.computeIfAbsent(
+ taskManager,
+ ignored -> new ArrayList<>(4));
vertices.add(vertex);
}
final long now = System.currentTimeMillis();
- List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+ List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
String host = entry.getKey();
List<AccessExecutionVertex> taskVertices = entry.getValue();
@@ -141,8 +145,10 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
duration = -1L;
}
- ExecutionState jobVertexState =
- ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+ ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(
+ tasksPerState,
+ taskVertices.size());
+
final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
@@ -153,11 +159,18 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete());
- Map<ExecutionState, Integer> statusCounts = new HashMap<>();
+ Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
for (ExecutionState state : ExecutionState.values()) {
statusCounts.put(state, tasksPerState[state.ordinal()]);
}
- taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
+ taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(
+ host,
+ jobVertexState,
+ startTime,
+ endTime,
+ duration,
+ jobVertexMetrics,
+ statusCounts));
}
return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
index 311d047..8424095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequest
"/:" + JobVertexIdPathParameter.KEY +
"/taskmanagers";
+ private JobVertexTaskManagersHeaders() {}
+
@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
index fc30155..75ff570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import java.util.List;
+import java.util.Collection;
import java.util.Map;
import java.util.Objects;
@@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
private final long now;
@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
- private List<TaskManagersInfo> taskManagers;
+ private Collection<TaskManagersInfo> taskManagerInfos;
@JsonCreator
public JobVertexTaskManagersInfo(
@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
@JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
@JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
- @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
+ @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) Collection<TaskManagersInfo> taskManagerInfos) {
this.jobVertexID = checkNotNull(jobVertexID);
this.name = checkNotNull(name);
this.now = now;
- this.taskManagers = checkNotNull(taskManagers);
+ this.taskManagerInfos = checkNotNull(taskManagerInfos);
}
@Override
@@ -82,12 +82,12 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
return Objects.equals(jobVertexID, that.jobVertexID) &&
Objects.equals(name, that.name) &&
now == that.now &&
- Objects.equals(taskManagers, that.taskManagers);
+ Objects.equals(taskManagerInfos, that.taskManagerInfos);
}
@Override
public int hashCode() {
- return Objects.hash(jobVertexID, name, now, taskManagers);
+ return Objects.hash(jobVertexID, name, now, taskManagerInfos);
}
// ---------------------------------------------------
[2/6] flink git commit: [FLINK-8224] [flip6] Shutdown application
when job terminated in job mode
Posted by tr...@apache.org.
[FLINK-8224] [flip6] Shutdown application when job terminated in job mode
This closes #5139.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ecc7ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ecc7ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ecc7ff
Branch: refs/heads/master
Commit: a4ecc7ffe4ba16a68de06c1053c7916e6082b413
Parents: c1734f4
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Fri Dec 8 18:02:42 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:29 2018 +0100
----------------------------------------------------------------------
.../clusterframework/MesosResourceManager.java | 4 +++-
.../entrypoint/JobClusterEntrypoint.java | 23 +++++++++++++++-----
.../resourcemanager/ResourceManager.java | 14 ++++++++----
.../StandaloneResourceManager.java | 6 +++--
.../resourcemanager/TestingResourceManager.java | 4 +++-
.../apache/flink/yarn/YarnResourceManager.java | 6 +++--
6 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index cabb7d7..8b67257 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -75,6 +75,8 @@ import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -371,7 +373,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
@Override
protected void shutDownApplication(
ApplicationStatus finalStatus,
- String optionalDiagnostics) throws ResourceManagerException {
+ @Nullable String optionalDiagnostics) throws ResourceManagerException {
LOG.info("Shutting down and unregistering as a Mesos framework.");
Exception exception = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index cb1b086..ede8d13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -53,6 +54,7 @@ import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
import akka.actor.ActorSystem;
@@ -258,8 +260,15 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
}
}
- private void shutDownAndTerminate(boolean cleanupHaData) {
+ private void shutDownAndTerminate(
+ boolean cleanupHaData,
+ ApplicationStatus status,
+ @Nullable String optionalDiagnostics) {
try {
+ if (resourceManager != null) {
+ resourceManager.shutDownCluster(status, optionalDiagnostics);
+ }
+
shutDown(cleanupHaData);
} catch (Throwable t) {
LOG.error("Could not properly shut down cluster entrypoint.", t);
@@ -292,23 +301,27 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
public void jobFinished(JobResult result) {
LOG.info("Job({}) finished.", jobId);
- shutDownAndTerminate(true);
+ shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null);
}
@Override
public void jobFailed(JobResult result) {
checkArgument(result.getSerializedThrowable().isPresent());
- LOG.info("Job({}) failed.", jobId, result.getSerializedThrowable().get().getMessage());
+ final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get();
+
+ final String errorMessage = serializedThrowable.getMessage();
+
+ LOG.info("Job({}) failed: {}.", jobId, errorMessage);
- shutDownAndTerminate(false);
+ shutDownAndTerminate(true, ApplicationStatus.FAILED, errorMessage);
}
@Override
public void jobFinishedByOther() {
LOG.info("Job({}) was finished by another JobManager.", jobId);
- shutDownAndTerminate(false);
+ shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by another master");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a0ff5f4..e5fef14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -63,6 +63,8 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -479,10 +481,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
* Cleanup application and shut down cluster.
*
* @param finalStatus of the Flink application
- * @param optionalDiagnostics for the Flink application
+ * @param optionalDiagnostics diagnostics message for the Flink application or {@code null}
*/
@Override
- public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
+ public void shutDownCluster(
+ final ApplicationStatus finalStatus,
+ @Nullable final String optionalDiagnostics) {
log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics);
try {
@@ -930,10 +934,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
* yet are returned.
*
* @param finalStatus The application status to report.
- * @param optionalDiagnostics An optional diagnostics message.
+ * @param optionalDiagnostics A diagnostics message or {@code null}.
* @throws ResourceManagerException if the application could not be shut down.
*/
- protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException;
+ protected abstract void shutDownApplication(
+ ApplicationStatus finalStatus,
+ @Nullable String optionalDiagnostics) throws ResourceManagerException;
/**
* Allocates a resource using the resource profile.
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 624f31d..886a046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -29,11 +29,13 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import javax.annotation.Nullable;
+
/**
* A standalone implementation of the resource manager. Used when the system is started in
* standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
*
- * This ResourceManager doesn't acquire new resources.
+ * <p>This ResourceManager doesn't acquire new resources.
*/
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
@@ -67,7 +69,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
}
@Override
- protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+ protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 0d30822..2af024e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import javax.annotation.Nullable;
+
/**
* Simple {@link ResourceManager} implementation for testing purposes.
*/
@@ -54,7 +56,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
}
@Override
- protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException {
+ protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException {
// noop
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0fa0dda..910172d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -252,11 +252,13 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
}
@Override
- protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+ protected void shutDownApplication(
+ ApplicationStatus finalStatus,
+ @Nullable String optionalDiagnostics) {
// first, de-register from YARN
FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
- log.info("Unregister application from the YARN Resource Manager");
+ log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);
try {
resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
[4/6] flink git commit: [hotfix] Reorder imports in ResourceProfile
according to checkstyle
Posted by tr...@apache.org.
[hotfix] Reorder imports in ResourceProfile according to checkstyle
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a354a9a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a354a9a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a354a9a7
Branch: refs/heads/master
Commit: a354a9a71a24316930f67963877cf28ea8279011
Parents: 47e6069
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 10:49:29 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:30 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/clusterframework/types/ResourceProfile.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a354a9a7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 87d6fc5..8fbaed1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.clusterframework.types;
-import org.apache.flink.api.common.resources.Resource;
import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.resources.Resource;
import javax.annotation.Nonnull;