You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:09 UTC

[flink] 08/16: [FLINK-12763][runtime] Yarn/MesosResourceManager do not start new worker when requested resource profile cannot be satisfied.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 887b8d97c78ac0ada200ca904719bdd5e59a3bed
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 17:58:42 2019 +0800

    [FLINK-12763][runtime] Yarn/MesosResourceManager do not start new worker when requested resource profile cannot be satisfied.
---
 .../flink/mesos/runtime/clusterframework/MesosResourceManager.java | 3 +++
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java   | 7 +++----
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e4ef99a..6a53935 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -437,6 +437,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 
 	@Override
 	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
+		if (!slotsPerWorker.iterator().next().isMatching(resourceProfile)) {
+			return Collections.emptyList();
+		}
 		LOG.info("Starting a new worker.");
 		try {
 			// generate new workers into persistent state and launch associated actors
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index f5e6c99..2e980d9 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -310,11 +310,10 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	@Override
 	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
-		Preconditions.checkArgument(
-			ResourceProfile.UNKNOWN.equals(resourceProfile),
-			"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
+		if (!slotsPerWorker.iterator().next().isMatching(resourceProfile)) {
+			return Collections.emptyList();
+		}
 		requestYarnContainer();
-
 		return slotsPerWorker;
 	}