You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/25 20:15:14 UTC

[1/6] flink git commit: [FLINK-8266] Add network memory to ResourceProfile for input/output memory of a task

Repository: flink
Updated Branches:
  refs/heads/master c1734f4bf -> 37b4e2cef


[FLINK-8266] Add network memory to ResourceProfile for input/output memory of a task

This closes #5170.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47e6069d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47e6069d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47e6069d

Branch: refs/heads/master
Commit: 47e6069d7a299c02a81f062a7acb6a792b71c146
Parents: a4ecc7f
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Fri Dec 15 18:43:27 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:29 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java | 36 +++++++++++++++---
 .../types/ResourceProfileTest.java              | 40 ++++++++++----------
 .../flink/yarn/YarnResourceManagerTest.java     |  2 +-
 3 files changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 6eb9af4..87d6fc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -64,6 +64,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	/** How many native memory in mb are needed. */
 	private final int nativeMemoryInMB;
 
+	/** Memory used for the task in the slot to communicate with its upstreams. Set by job master. */
+	private final int networkMemoryInMB;
+
 	/** A extensible field for user specified resources from {@link ResourceSpec}. */
 	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
@@ -76,6 +79,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 * @param directMemoryInMB The size of the direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
+	 * @param networkMemoryInMB The size of the memory for input and output, in megabytes.
 	 * @param extendedResources The extended resources such as GPU and FPGA
 	 */
 	public ResourceProfile(
@@ -83,11 +87,13 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			int heapMemoryInMB,
 			int directMemoryInMB,
 			int nativeMemoryInMB,
+			int networkMemoryInMB,
 			Map<String, Resource> extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
+		this.networkMemoryInMB = networkMemoryInMB;
 		if (extendedResources != null) {
 			this.extendedResources.putAll(extendedResources);
 		}
@@ -100,7 +106,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 */
 	public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-		this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP);
+		this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.EMPTY_MAP);
 	}
 
 	/**
@@ -109,7 +115,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param other The ResourceProfile to copy.
 	 */
 	public ResourceProfile(ResourceProfile other) {
-		this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
+		this(other.cpuCores,
+				other.heapMemoryInMB,
+				other.directMemoryInMB,
+				other.nativeMemoryInMB,
+				other.networkMemoryInMB,
+				other.extendedResources);
 	}
 
 	// ------------------------------------------------------------------------
@@ -151,12 +162,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
+	 * Get the memory needed for task to communicate with its upstreams and downstreams in MB.
+	 * @return The network memory in MB
+	 */
+	public int getNetworkMemoryInMB() {
+		return networkMemoryInMB;
+	}
+
+	/**
 	 * Get the total memory needed in MB.
 	 *
 	 * @return The total memory in MB
 	 */
 	public int getMemoryInMB() {
-		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB + networkMemoryInMB;
 	}
 
 	/**
@@ -187,7 +206,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		if (cpuCores >= required.getCpuCores() &&
 				heapMemoryInMB >= required.getHeapMemoryInMB() &&
 				directMemoryInMB >= required.getDirectMemoryInMB() &&
-				nativeMemoryInMB >= required.getNativeMemoryInMB()) {
+				nativeMemoryInMB >= required.getNativeMemoryInMB() &&
+				networkMemoryInMB >= required.getNetworkMemoryInMB()) {
 			for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
 				if (!extendedResources.containsKey(resource.getKey()) ||
 						!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
@@ -241,6 +261,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		result = 31 * result + heapMemoryInMB;
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
+		result = 31 * result + networkMemoryInMB;
 		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
@@ -255,6 +276,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			return this.cpuCores == that.cpuCores &&
 					this.heapMemoryInMB == that.heapMemoryInMB &&
 					this.directMemoryInMB == that.directMemoryInMB &&
+					this.networkMemoryInMB == that.networkMemoryInMB &&
 					Objects.equals(extendedResources, that.extendedResources);
 		}
 		return false;
@@ -270,11 +292,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			"cpuCores=" + cpuCores +
 			", heapMemoryInMB=" + heapMemoryInMB +
 			", directMemoryInMB=" + directMemoryInMB +
-			", nativeMemoryInMB=" + nativeMemoryInMB + resources +
+			", nativeMemoryInMB=" + nativeMemoryInMB +
+			", networkMemoryInMB=" + networkMemoryInMB + resources +
 			'}';
 	}
 
-	static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) {
+	static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
 		Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
 
 		return new ResourceProfile(
@@ -282,6 +305,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				resourceSpec.getHeapMemory(),
 				resourceSpec.getDirectMemory(),
 				resourceSpec.getNativeMemory(),
+				networkMemory,
 				copiedExtendedResources);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 25cb5fb..7ed688a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMatchRequirement() throws Exception {
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, Collections.EMPTY_MAP);
-		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, Collections.EMPTY_MAP);
-		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, Collections.EMPTY_MAP);
-		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, Collections.EMPTY_MAP);
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
 
 		assertFalse(rp1.isMatching(rp2));
 		assertTrue(rp2.isMatching(rp1));
@@ -50,6 +50,9 @@ public class ResourceProfileTest {
 		assertTrue(rp4.isMatching(rp3));
 		assertTrue(rp4.isMatching(rp4));
 
+		ResourceProfile rp5 = new ResourceProfile(2.0, 100, 100, 100, 100, null);
+		assertFalse(rp4.isMatching(rp5));
+
 		ResourceSpec rs1 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
@@ -61,10 +64,9 @@ public class ResourceProfileTest {
 				setGPUResource(1.1).
 				build();
 
-
-		assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1)));
-		assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2)));
-		assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1)));
+		assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1, 0)));
+		assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).isMatching(ResourceProfile.fromResourceSpec(rs2, 0)));
+		assertFalse(ResourceProfile.fromResourceSpec(rs2, 0).isMatching(ResourceProfile.fromResourceSpec(rs1, 0)));
 	}
 
 	@Test
@@ -76,7 +78,7 @@ public class ResourceProfileTest {
 	public void testEquals() throws Exception {
 		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
 		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
-		assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2)));
+		assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).equals(ResourceProfile.fromResourceSpec(rs2, 0)));
 
 		ResourceSpec rs3 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
@@ -88,37 +90,37 @@ public class ResourceProfileTest {
 				setHeapMemoryInMB(100).
 				setGPUResource(1.1).
 				build();
-		assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4)));
+		assertFalse(ResourceProfile.fromResourceSpec(rs3, 0).equals(ResourceProfile.fromResourceSpec(rs4, 0)));
 
 		ResourceSpec rs5 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
 				setGPUResource(2.2).
 				build();
-		assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5)));
+		assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
 	}
 
 	@Test
 	public void testCompareTo() throws Exception {
 		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
 		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
-		assertEquals(0, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2)));
+		assertEquals(0, ResourceProfile.fromResourceSpec(rs1, 0).compareTo(ResourceProfile.fromResourceSpec(rs2, 0)));
 
 		ResourceSpec rs3 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
 				setGPUResource(2.2).
 				build();
-		assertEquals(-1, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3)));
-		assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1)));
+		assertEquals(-1, ResourceProfile.fromResourceSpec(rs1,  0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
+		assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs1, 0)));
 
 		ResourceSpec rs4 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
 				setGPUResource(1.1).
 				build();
-		assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4)));
-		assertEquals(-1, ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+		assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs4, 0)));
+		assertEquals(-1, ResourceProfile.fromResourceSpec(rs4, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
 
 
 		ResourceSpec rs5 = ResourceSpec.newBuilder().
@@ -126,7 +128,7 @@ public class ResourceProfileTest {
 				setHeapMemoryInMB(100).
 				setGPUResource(2.2).
 				build();
-		assertEquals(0, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5)));
+		assertEquals(0, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs5, 0)));
 	}
 
 	@Test
@@ -136,10 +138,10 @@ public class ResourceProfileTest {
 				setHeapMemoryInMB(100).
 				setGPUResource(1.6).
 				build();
-		ResourceProfile rp = ResourceProfile.fromResourceSpec(rs);
+		ResourceProfile rp = ResourceProfile.fromResourceSpec(rs, 50);
 
 		assertEquals(1.0, rp.getCpuCores(), 0.000001);
-		assertEquals(100, rp.getMemoryInMB());
+		assertEquals(150, rp.getMemoryInMB());
 		assertEquals(100, rp.getOperatorsMemoryInMB());
 		assertEquals(1.6, rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/47e6069d/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index cd08fb9..1e70169 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -347,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger {
 			final SlotReport slotReport = new SlotReport(
 				new SlotStatus(
 					new SlotID(taskManagerResourceId, 1),
-					new ResourceProfile(10, 1, 1, 1, Collections.emptyMap())));
+					new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap())));
 
 			CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
 				.registerTaskExecutor(


[3/6] flink git commit: [hotfix] [qs] Combine logging statements in QueryableStateUtils#createKvStateServer

Posted by tr...@apache.org.
[hotfix] [qs] Combine logging statements in QueryableStateUtils#createKvStateServer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5757942d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5757942d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5757942d

Branch: refs/heads/master
Commit: 5757942d598afa4b2df6a0beb8da38052dbbee10
Parents: a354a9a
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 10:56:35 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:30 2018 +0100

----------------------------------------------------------------------
 .../runtime/query/QueryableStateUtils.java      | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5757942d/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index adbe15d..c3f83447 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -73,10 +73,14 @@ public final class QueryableStateUtils {
 					KvStateRequestStats.class);
 			return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, stats);
 		} catch (ClassNotFoundException e) {
-			LOG.warn("Could not load Queryable State Client Proxy. " +
-					"Probable reason: flink-queryable-state-runtime is not in the classpath. " +
-					"Please put the corresponding jar from the opt to the lib folder.");
-			LOG.debug("Caught exception", e);
+			final String msg = "Could not load Queryable State Client Proxy. " +
+				"Probable reason: flink-queryable-state-runtime is not in the classpath. " +
+				"Please put the corresponding jar from the opt to the lib folder.";
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(msg, e);
+			} else {
+				LOG.info(msg);
+			}
 			return null;
 		} catch (InvocationTargetException e) {
 			LOG.error("Queryable State Client Proxy could not be created: ", e.getTargetException());
@@ -128,10 +132,14 @@ public final class QueryableStateUtils {
 					KvStateRequestStats.class);
 			return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, kvStateRegistry, stats);
 		} catch (ClassNotFoundException e) {
-			LOG.warn("Could not load Queryable State Server. " +
-					"Probable reason: flink-queryable-state-runtime is not in the classpath. " +
-					"Please put the corresponding jar from the opt to the lib folder.");
-			LOG.debug("Caught exception", e);
+			final String msg = "Could not load Queryable State Server. " +
+				"Probable reason: flink-queryable-state-runtime is not in the classpath. " +
+				"Please put the corresponding jar from the opt to the lib folder.";
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(msg, e);
+			} else {
+				LOG.info(msg);
+			}
 			return null;
 		} catch (InvocationTargetException e) {
 			LOG.error("Queryable State Server could not be created: ", e.getTargetException());


[5/6] flink git commit: [FLINK-7858][flip6] Port JobVertexTaskManagersHandler to REST endpoint

Posted by tr...@apache.org.
[FLINK-7858][flip6] Port JobVertexTaskManagersHandler to REST endpoint

This closes #5149.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/056c72af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/056c72af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/056c72af

Branch: refs/heads/master
Commit: 056c72af994bc0b7bd838faff6b2991763fc2ac1
Parents: 5757942
Author: zjureel <zj...@gmail.com>
Authored: Tue Dec 19 16:56:50 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:30 2018 +0100

----------------------------------------------------------------------
 .../job/JobVertexTaskManagersHandler.java       | 165 ++++++++++++++++++
 .../messages/JobVertexTaskManagersHeaders.java  |  72 ++++++++
 .../messages/JobVertexTaskManagersInfo.java     | 171 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  13 ++
 .../messages/JobVertexTaskManagersInfoTest.java |  65 +++++++
 5 files changed, 486 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
new file mode 100644
index 0000000..9b59e8d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex task managers.
+ */
+public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
+	private MetricFetcher<?> metricFetcher;
+
+	public JobVertexTaskManagersHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor,
+			MetricFetcher<?> metricFetcher) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+		this.metricFetcher = metricFetcher;
+	}
+
+	@Override
+	protected JobVertexTaskManagersInfo handleRequest(
+			HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request,
+			AccessExecutionGraph executionGraph) throws RestHandlerException {
+		JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+
+		// Build a map that groups tasks by TaskManager
+		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
+		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
+			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
+			List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
+			if (vertices == null) {
+				vertices = new ArrayList<>();
+				taskManagerVertices.put(taskManager, vertices);
+			}
+
+			vertices.add(vertex);
+		}
+
+		final long now = System.currentTimeMillis();
+
+		List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+		for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
+			String host = entry.getKey();
+			List<AccessExecutionVertex> taskVertices = entry.getValue();
+
+			int[] tasksPerState = new int[ExecutionState.values().length];
+
+			long startTime = Long.MAX_VALUE;
+			long endTime = 0;
+			boolean allFinished = true;
+
+			MutableIOMetrics counts = new MutableIOMetrics();
+
+			for (AccessExecutionVertex vertex : taskVertices) {
+				final ExecutionState state = vertex.getExecutionState();
+				tasksPerState[state.ordinal()]++;
+
+				// take the earliest start time
+				long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+				if (started > 0) {
+					startTime = Math.min(startTime, started);
+				}
+
+				allFinished &= state.isTerminal();
+				endTime = Math.max(endTime, vertex.getStateTimestamp(state));
+
+				counts.addIOMetrics(
+					vertex.getCurrentExecutionAttempt(),
+					metricFetcher,
+					jobID.toString(),
+					jobVertex.getJobVertexId().toString());
+			}
+
+			long duration;
+			if (startTime < Long.MAX_VALUE) {
+				if (allFinished) {
+					duration = endTime - startTime;
+				}
+				else {
+					endTime = -1L;
+					duration = now - startTime;
+				}
+			}
+			else {
+				startTime = -1L;
+				endTime = -1L;
+				duration = -1L;
+			}
+
+			ExecutionState jobVertexState =
+				ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+			final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
+				counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
+				counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
+				counts.getNumBytesOut(),
+				counts.isNumBytesOutComplete(),
+				counts.getNumRecordsIn(),
+				counts.isNumRecordsInComplete(),
+				counts.getNumRecordsOut(),
+				counts.isNumRecordsOutComplete());
+
+			Map<ExecutionState, Integer> statusCounts = new HashMap<>();
+			for (ExecutionState state : ExecutionState.values()) {
+				statusCounts.put(state, tasksPerState[state.ordinal()]);
+			}
+			taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
+		}
+
+		return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
new file mode 100644
index 0000000..311d047
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexTaskManagersHandler}.
+ */
+public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, JobVertexMessageParameters> {
+
+	private static final JobVertexTaskManagersHeaders INSTANCE = new JobVertexTaskManagersHeaders();
+
+	public static final String URL = "/jobs" +
+		"/:" + JobIDPathParameter.KEY +
+		"/vertices" +
+		"/:" + JobVertexIdPathParameter.KEY +
+		"/taskmanagers";
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobVertexTaskManagersInfo> getResponseClass() {
+		return JobVertexTaskManagersInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobVertexMessageParameters getUnresolvedMessageParameters() {
+		return new JobVertexMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static JobVertexTaskManagersHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
new file mode 100644
index 0000000..fc30155
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexTaskManagersHandler}.
+ */
+public class JobVertexTaskManagersInfo implements ResponseBody {
+	public static final String VERTEX_TASK_FIELD_ID = "id";
+	public static final String VERTEX_TASK_FIELD_NAME = "name";
+	public static final String VERTEX_TASK_FIELD_NOW = "now";
+	public static final String VERTEX_TASK_FIELD_TASK_MANAGERS = "taskmanagers";
+
+	@JsonProperty(VERTEX_TASK_FIELD_ID)
+	@JsonSerialize(using = JobVertexIDSerializer.class)
+	private final JobVertexID jobVertexID;
+
+	@JsonProperty(VERTEX_TASK_FIELD_NAME)
+	private final String name;
+
+	@JsonProperty(VERTEX_TASK_FIELD_NOW)
+	private final long now;
+
+	@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
+	private List<TaskManagersInfo> taskManagers;
+
+	@JsonCreator
+	public JobVertexTaskManagersInfo(
+			@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
+			@JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
+			@JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
+			@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
+		this.jobVertexID = checkNotNull(jobVertexID);
+		this.name = checkNotNull(name);
+		this.now = now;
+		this.taskManagers = checkNotNull(taskManagers);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		JobVertexTaskManagersInfo that = (JobVertexTaskManagersInfo) o;
+		return Objects.equals(jobVertexID, that.jobVertexID) &&
+			Objects.equals(name, that.name) &&
+			now == that.now &&
+			Objects.equals(taskManagers, that.taskManagers);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(jobVertexID, name, now, taskManagers);
+	}
+
+	// ---------------------------------------------------
+	// Static inner classes
+	// ---------------------------------------------------
+
+	/**
+	 * Detailed information about task managers.
+	 */
+	public static class TaskManagersInfo {
+		public static final String TASK_MANAGERS_FIELD_HOST = "host";
+		public static final String TASK_MANAGERS_FIELD_STATUS = "status";
+		public static final String TASK_MANAGERS_FIELD_START_TIME = "start-time";
+		public static final String TASK_MANAGERS_FIELD_END_TIME = "end-time";
+		public static final String TASK_MANAGERS_FIELD_DURATION = "duration";
+		public static final String TASK_MANAGERS_FIELD_METRICS = "metrics";
+		public static final String TASK_MANAGERS_FIELD_STATUS_COUNTS = "status-counts";
+
+		@JsonProperty(TASK_MANAGERS_FIELD_HOST)
+		private final String host;
+
+		@JsonProperty(TASK_MANAGERS_FIELD_STATUS)
+		private final ExecutionState status;
+
+		@JsonProperty(TASK_MANAGERS_FIELD_START_TIME)
+		private final long startTime;
+
+		@JsonProperty(TASK_MANAGERS_FIELD_END_TIME)
+		private final long endTime;
+
+		@JsonProperty(TASK_MANAGERS_FIELD_DURATION)
+		private final long duration;
+
+		@JsonProperty(TASK_MANAGERS_FIELD_METRICS)
+		private final IOMetricsInfo metrics;
+
+		@JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS)
+		private final Map<ExecutionState, Integer> statusCounts;
+
+		@JsonCreator
+		public TaskManagersInfo(
+				@JsonProperty(TASK_MANAGERS_FIELD_HOST) String host,
+				@JsonProperty(TASK_MANAGERS_FIELD_STATUS) ExecutionState status,
+				@JsonProperty(TASK_MANAGERS_FIELD_START_TIME) long startTime,
+				@JsonProperty(TASK_MANAGERS_FIELD_END_TIME) long endTime,
+				@JsonProperty(TASK_MANAGERS_FIELD_DURATION) long duration,
+				@JsonProperty(TASK_MANAGERS_FIELD_METRICS) IOMetricsInfo metrics,
+				@JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS) Map<ExecutionState, Integer> statusCounts) {
+			this.host = checkNotNull(host);
+			this.status = checkNotNull(status);
+			this.startTime = startTime;
+			this.endTime = endTime;
+			this.duration = duration;
+			this.metrics = checkNotNull(metrics);
+			this.statusCounts = checkNotNull(statusCounts);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			TaskManagersInfo that = (TaskManagersInfo) o;
+			return Objects.equals(host, that.host) &&
+				Objects.equals(status, that.status) &&
+				startTime == that.startTime &&
+				endTime == that.endTime &&
+				duration == that.duration &&
+				Objects.equals(metrics, that.metrics) &&
+				Objects.equals(statusCounts, that.statusCounts);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(host, status, startTime, endTime, duration, metrics, statusCounts);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index e432752..30a68d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
 import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
@@ -69,6 +70,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
@@ -350,6 +352,16 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			responseHeaders,
 			metricFetcher);
 
+		final JobVertexTaskManagersHandler jobVertexTaskManagersHandler = new JobVertexTaskManagersHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobVertexTaskManagersHeaders.getInstance(),
+			executionGraphCache,
+			executor,
+			metricFetcher);
+
 		final JobExecutionResultHandler jobExecutionResultHandler = new JobExecutionResultHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -446,6 +458,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
 		handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(), subtaskExecutionAttemptAccumulatorsHandler));
 		handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), subtaskCurrentAttemptDetailsHandler));
+		handlers.add(Tuple2.of(JobVertexTaskManagersHeaders.getInstance(), jobVertexTaskManagersHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
new file mode 100644
index 0000000..1a7b521
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo.TaskManagersInfo;
+
+/**
+ * Tests that the {@link JobVertexTaskManagersInfo} can be marshalled and unmarshalled.
+ */
+public class JobVertexTaskManagersInfoTest extends RestResponseMarshallingTestBase<JobVertexTaskManagersInfo> {
+	@Override
+	protected Class<JobVertexTaskManagersInfo> getTestResponseClass() {
+		return JobVertexTaskManagersInfo.class;
+	}
+
+	@Override
+	protected JobVertexTaskManagersInfo getTestResponseInstance() throws Exception {
+		final Random random = new Random();
+		List<TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+
+		final Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
+		final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
+			random.nextLong(),
+			random.nextBoolean(),
+			random.nextLong(),
+			random.nextBoolean(),
+			random.nextLong(),
+			random.nextBoolean(),
+			random.nextLong(),
+			random.nextBoolean());
+		int count = 100;
+		for (ExecutionState executionState : ExecutionState.values()) {
+			statusCounts.put(executionState, count++);
+		}
+		taskManagersInfoList.add(new TaskManagersInfo("host1", ExecutionState.CANCELING, 1L, 2L, 3L, jobVertexMetrics, statusCounts));
+
+		return new JobVertexTaskManagersInfo(new JobVertexID(), "test", System.currentTimeMillis(), taskManagersInfoList);
+	}
+}


[6/6] flink git commit: [FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown

Posted by tr...@apache.org.
[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown

Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @Nullable.
Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is unknown.
Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown.
Copy Javadoc from legacy JobVertexTaskManagersHandler.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b4e2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b4e2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b4e2ce

Branch: refs/heads/master
Commit: 37b4e2cef687160f2bc7cedb7d2360825089569e
Parents: 056c72a
Author: gyao <ga...@data-artisans.com>
Authored: Wed Jan 24 12:24:35 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:55:54 2018 +0100

----------------------------------------------------------------------
 .../executiongraph/AccessExecutionGraph.java    |  3 +-
 .../flink/runtime/rest/NotFoundException.java   |  4 ++
 .../job/AbstractExecutionGraphHandler.java      | 15 ++++++-
 .../job/JobVertexTaskManagersHandler.java       | 41 +++++++++++++-------
 .../messages/JobVertexTaskManagersHeaders.java  |  2 +
 .../messages/JobVertexTaskManagersInfo.java     | 12 +++---
 6 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 362afa1..8d1fa1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -76,8 +76,9 @@ public interface AccessExecutionGraph {
 	 * Returns the job vertex for the given {@link JobVertexID}.
 	 *
 	 * @param id id of job vertex to be returned
-	 * @return job vertex for the given id, or null
+	 * @return job vertex for the given id, or {@code null}
 	 */
+	@Nullable
 	AccessExecutionJobVertex getJobVertex(JobVertexID id);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
index 50060b0..f9db334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException {
 	public NotFoundException(String message) {
 		super(message, HttpResponseStatus.NOT_FOUND);
 	}
+
+	public NotFoundException(String message, Throwable cause) {
+		super(message, HttpResponseStatus.NOT_FOUND, cause);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 7192832..7c42af1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -79,8 +82,16 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M ex
 				} catch (RestHandlerException rhe) {
 					throw new CompletionException(rhe);
 				}
-			},
-			executor);
+			}, executor)
+			.exceptionally(throwable -> {
+				throwable = ExceptionUtils.stripCompletionException(throwable);
+				if (throwable instanceof FlinkJobNotFoundException) {
+					throw new CompletionException(
+						new NotFoundException(String.format("Job %s not found", jobId), throwable));
+				} else {
+					throw new CompletionException(throwable);
+				}
+			});
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 9b59e8d..24650a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -50,7 +52,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
- * Request handler for the job vertex task managers.
+ * A request handler that provides the details of a job vertex, including id, name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
 	private MetricFetcher<?> metricFetcher;
@@ -65,7 +68,7 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 			Executor executor,
 			MetricFetcher<?> metricFetcher) {
 		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
-		this.metricFetcher = metricFetcher;
+		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
 	}
 
 	@Override
@@ -76,23 +79,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
 
+		if (jobVertex == null) {
+			throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
+		}
+
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
 			TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
-			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
-			List<AccessExecutionVertex> vertices = taskManagerVertices.get(taskManager);
-			if (vertices == null) {
-				vertices = new ArrayList<>();
-				taskManagerVertices.put(taskManager, vertices);
-			}
-
+			String taskManager = location == null ? "(unassigned)" : location.getHostname() + ':' + location.dataPort();
+			List<AccessExecutionVertex> vertices = taskManagerVertices.computeIfAbsent(
+				taskManager,
+				ignored -> new ArrayList<>(4));
 			vertices.add(vertex);
 		}
 
 		final long now = System.currentTimeMillis();
 
-		List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+		List<JobVertexTaskManagersInfo.TaskManagersInfo> taskManagersInfoList = new ArrayList<>(4);
 		for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
 			String host = entry.getKey();
 			List<AccessExecutionVertex> taskVertices = entry.getValue();
@@ -141,8 +145,10 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 				duration = -1L;
 			}
 
-			ExecutionState jobVertexState =
-				ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, taskVertices.size());
+			ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(
+				tasksPerState,
+				taskVertices.size());
+
 			final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
 				counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
 				counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
@@ -153,11 +159,18 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 				counts.getNumRecordsOut(),
 				counts.isNumRecordsOutComplete());
 
-			Map<ExecutionState, Integer> statusCounts = new HashMap<>();
+			Map<ExecutionState, Integer> statusCounts = new HashMap<>(ExecutionState.values().length);
 			for (ExecutionState state : ExecutionState.values()) {
 				statusCounts.put(state, tasksPerState[state.ordinal()]);
 			}
-			taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, endTime, duration, jobVertexMetrics, statusCounts));
+			taskManagersInfoList.add(new JobVertexTaskManagersInfo.TaskManagersInfo(
+				host,
+				jobVertexState,
+				startTime,
+				endTime,
+				duration,
+				jobVertexMetrics,
+				statusCounts));
 		}
 
 		return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
index 311d047..8424095 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements MessageHeaders<EmptyRequest
 		"/:" + JobVertexIdPathParameter.KEY +
 		"/taskmanagers";
 
+	private JobVertexTaskManagersHeaders() {}
+
 	@Override
 	public Class<EmptyRequestBody> getRequestClass() {
 		return EmptyRequestBody.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
index fc30155..75ff570 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 
@@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
 	private final long now;
 
 	@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
-	private List<TaskManagersInfo> taskManagers;
+	private Collection<TaskManagersInfo> taskManagerInfos;
 
 	@JsonCreator
 	public JobVertexTaskManagersInfo(
 			@JsonDeserialize(using = JobVertexIDDeserializer.class) @JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
 			@JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
 			@JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
-			@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) List<TaskManagersInfo> taskManagers) {
+			@JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) Collection<TaskManagersInfo> taskManagerInfos) {
 		this.jobVertexID = checkNotNull(jobVertexID);
 		this.name = checkNotNull(name);
 		this.now = now;
-		this.taskManagers = checkNotNull(taskManagers);
+		this.taskManagerInfos = checkNotNull(taskManagerInfos);
 	}
 
 	@Override
@@ -82,12 +82,12 @@ public class JobVertexTaskManagersInfo implements ResponseBody {
 		return Objects.equals(jobVertexID, that.jobVertexID) &&
 			Objects.equals(name, that.name) &&
 			now == that.now &&
-			Objects.equals(taskManagers, that.taskManagers);
+			Objects.equals(taskManagerInfos, that.taskManagerInfos);
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(jobVertexID, name, now, taskManagers);
+		return Objects.hash(jobVertexID, name, now, taskManagerInfos);
 	}
 
 	// ---------------------------------------------------


[2/6] flink git commit: [FLINK-8224] [flip6] Shutdown application when job terminated in job mode

Posted by tr...@apache.org.
[FLINK-8224] [flip6] Shutdown application when job terminated in job mode

This closes #5139.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ecc7ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ecc7ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ecc7ff

Branch: refs/heads/master
Commit: a4ecc7ffe4ba16a68de06c1053c7916e6082b413
Parents: c1734f4
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Fri Dec 8 18:02:42 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:29 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  4 +++-
 .../entrypoint/JobClusterEntrypoint.java        | 23 +++++++++++++++-----
 .../resourcemanager/ResourceManager.java        | 14 ++++++++----
 .../StandaloneResourceManager.java              |  6 +++--
 .../resourcemanager/TestingResourceManager.java |  4 +++-
 .../apache/flink/yarn/YarnResourceManager.java  |  6 +++--
 6 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index cabb7d7..8b67257 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -75,6 +75,8 @@ import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -371,7 +373,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	@Override
 	protected void shutDownApplication(
 			ApplicationStatus finalStatus,
-			String optionalDiagnostics) throws ResourceManagerException {
+			@Nullable String optionalDiagnostics) throws ResourceManagerException {
 		LOG.info("Shutting down and unregistering as a Mesos framework.");
 
 		Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index cb1b086..ede8d13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -53,6 +54,7 @@ import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorSystem;
 
@@ -258,8 +260,15 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		}
 	}
 
-	private void shutDownAndTerminate(boolean cleanupHaData) {
+	private void shutDownAndTerminate(
+			boolean cleanupHaData,
+			ApplicationStatus status,
+			@Nullable String optionalDiagnostics) {
 		try {
+			if (resourceManager != null) {
+				resourceManager.shutDownCluster(status, optionalDiagnostics);
+			}
+
 			shutDown(cleanupHaData);
 		} catch (Throwable t) {
 			LOG.error("Could not properly shut down cluster entrypoint.", t);
@@ -292,23 +301,27 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		public void jobFinished(JobResult result) {
 			LOG.info("Job({}) finished.", jobId);
 
-			shutDownAndTerminate(true);
+			shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null);
 		}
 
 		@Override
 		public void jobFailed(JobResult result) {
 			checkArgument(result.getSerializedThrowable().isPresent());
 
-			LOG.info("Job({}) failed.", jobId, result.getSerializedThrowable().get().getMessage());
+			final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get();
+
+			final String errorMessage = serializedThrowable.getMessage();
+
+			LOG.info("Job({}) failed: {}.", jobId, errorMessage);
 
-			shutDownAndTerminate(false);
+			shutDownAndTerminate(true, ApplicationStatus.FAILED, errorMessage);
 		}
 
 		@Override
 		public void jobFinishedByOther() {
 			LOG.info("Job({}) was finished by another JobManager.", jobId);
 
-			shutDownAndTerminate(false);
+			shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, "Job was finished by another master");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a0ff5f4..e5fef14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -63,6 +63,8 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -479,10 +481,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * Cleanup application and shut down cluster.
 	 *
 	 * @param finalStatus of the Flink application
-	 * @param optionalDiagnostics for the Flink application
+	 * @param optionalDiagnostics diagnostics message for the Flink application or {@code null}
 	 */
 	@Override
-	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
+	public void shutDownCluster(
+			final ApplicationStatus finalStatus,
+			@Nullable final String optionalDiagnostics) {
 		log.info("Shut down cluster because application is in {}, diagnostics {}.", finalStatus, optionalDiagnostics);
 
 		try {
@@ -930,10 +934,12 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * yet are returned.
 	 *
 	 * @param finalStatus The application status to report.
-	 * @param optionalDiagnostics An optional diagnostics message.
+	 * @param optionalDiagnostics A diagnostics message or {@code null}.
 	 * @throws ResourceManagerException if the application could not be shut down.
 	 */
-	protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException;
+	protected abstract void shutDownApplication(
+		ApplicationStatus finalStatus,
+		@Nullable String optionalDiagnostics) throws ResourceManagerException;
 
 	/**
 	 * Allocates a resource using the resource profile.

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 624f31d..886a046 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -29,11 +29,13 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nullable;
+
 /**
  * A standalone implementation of the resource manager. Used when the system is started in
  * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
  *
- * This ResourceManager doesn't acquire new resources.
+ * <p>This ResourceManager doesn't acquire new resources.
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
@@ -67,7 +69,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+	protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 0d30822..2af024e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -29,6 +29,8 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nullable;
+
 /**
  * Simple {@link ResourceManager} implementation for testing purposes.
  */
@@ -54,7 +56,7 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) throws ResourceManagerException {
+	protected void shutDownApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException {
 		// noop
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0fa0dda..910172d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -252,11 +252,13 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	}
 
 	@Override
-	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+	protected void shutDownApplication(
+		ApplicationStatus finalStatus,
+		@Nullable String optionalDiagnostics) {
 
 		// first, de-register from YARN
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
-		log.info("Unregister application from the YARN Resource Manager");
+		log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);
 
 		try {
 			resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");


[4/6] flink git commit: [hotfix] Reorder imports in ResourceProfile according to checkstyle

Posted by tr...@apache.org.
[hotfix] Reorder imports in ResourceProfile according to checkstyle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a354a9a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a354a9a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a354a9a7

Branch: refs/heads/master
Commit: a354a9a71a24316930f67963877cf28ea8279011
Parents: 47e6069
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 25 10:49:29 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 25 15:33:30 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/clusterframework/types/ResourceProfile.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a354a9a7/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 87d6fc5..8fbaed1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
-import org.apache.flink.api.common.resources.Resource;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.resources.Resource;
 
 import javax.annotation.Nonnull;