You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:38:34 UTC

[flink] branch master updated (08e25db -> e663990)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 08e25db  [FLINK-10415] Fail requests with empty Netty pipeline in RestClient
     new 771277b  [FLINK-9455][RM] Add support for multi task slot TaskExecutors
     new fb50658  [hotfix] Cancel actual pending slot request in SlotManager#updateSlotState
     new 0c95396  [hotfix] Remove mocking from SlotManagerTest
     new b0ba980  [hotfix] Remove mocking from SlotProtocolTest
     new e663990  [hotfix] Start MesosWorkers with default ContaineredTaskManagerConfiguration

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../clusterframework/MesosResourceManager.java     |  47 +--
 .../clusterframework/MesosResourceManagerTest.java |   2 +-
 .../clusterframework/types/ResourceProfile.java    |   3 +
 .../runtime/resourcemanager/ResourceManager.java   |  20 +-
 .../resourcemanager/StandaloneResourceManager.java |   6 +-
 .../slotmanager/PendingSlotRequest.java            |  23 ++
 .../slotmanager/PendingTaskManagerSlot.java        |  64 ++++
 .../slotmanager/ResourceActions.java               |   5 +-
 .../resourcemanager/slotmanager/SlotManager.java   | 123 ++++++-
 ...anagerException.java => TaskManagerSlotId.java} |  18 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |   3 +-
 .../resourcemanager/TestingResourceManager.java    |   7 +-
 .../slotmanager/SlotManagerTest.java               | 406 ++++++++++++++-------
 .../slotmanager/SlotProtocolTest.java              |  54 +--
 .../slotmanager/TestingResourceActions.java        |  14 +-
 .../slotmanager/TestingResourceActionsBuilder.java |  20 +-
 .../org/apache/flink/yarn/YarnResourceManager.java |  40 +-
 17 files changed, 623 insertions(+), 232 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/{SlotManagerException.java => TaskManagerSlotId.java} (67%)


[flink] 01/05: [FLINK-9455][RM] Add support for multi task slot TaskExecutors

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 771277b42427d55b52a28d5893c71c3b8de97807
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Aug 22 13:51:38 2018 +0200

    [FLINK-9455][RM] Add support for multi task slot TaskExecutors
    
    Extend ResourceActions interface to return a set of ResourceProfiles describing
    the set of slots with which the new resource will be started. The SlotManager
    stores them as PendingTaskManagerSlots which can be assigned to PendingSlotRequests.
    Only if there are no more TaskManagerSlots and PendingTaskManagerSlots, the
    SlotManager will request new resources from the ResourceManager.
    
    This closes #6734.
---
 .../clusterframework/MesosResourceManager.java     |  43 ++--
 .../clusterframework/types/ResourceProfile.java    |   3 +
 .../runtime/resourcemanager/ResourceManager.java   |  20 +-
 .../resourcemanager/StandaloneResourceManager.java |   6 +-
 .../slotmanager/PendingSlotRequest.java            |  23 ++
 .../slotmanager/PendingTaskManagerSlot.java        |  64 ++++++
 .../slotmanager/ResourceActions.java               |   5 +-
 .../resourcemanager/slotmanager/SlotManager.java   | 117 +++++++++-
 .../slotmanager/TaskManagerSlotId.java             |  36 +++
 .../runtime/taskexecutor/TaskManagerServices.java  |   3 +-
 .../resourcemanager/TestingResourceManager.java    |   7 +-
 .../slotmanager/SlotManagerTest.java               | 248 ++++++++++++++++++---
 .../slotmanager/SlotProtocolTest.java              |  10 +-
 .../slotmanager/TestingResourceActions.java        |  14 +-
 .../slotmanager/TestingResourceActionsBuilder.java |  20 +-
 .../org/apache/flink/yarn/YarnResourceManager.java |  40 ++--
 16 files changed, 556 insertions(+), 103 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index f18d0d8..e7a5c98 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -24,7 +24,6 @@ import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
 import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
@@ -79,6 +78,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -124,6 +124,8 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	@Nullable
 	private final String webUiUrl;
 
+	private final Collection<ResourceProfile> slotsPerWorker;
+
 	/** Mesos scheduler driver. */
 	private SchedulerDriver schedulerDriver;
 
@@ -191,6 +193,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		this.workersInNew = new HashMap<>(8);
 		this.workersInLaunch = new HashMap<>(8);
 		this.workersBeingReturned = new HashMap<>(8);
+
+		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
+		this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots());
 	}
 
 	protected ActorRef createSelfActor() {
@@ -426,7 +431,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	}
 
 	@Override
-	public void startNewWorker(ResourceProfile resourceProfile) {
+	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
 		LOG.info("Starting a new worker.");
 		try {
 			// generate new workers into persistent state and launch associated actors
@@ -443,9 +448,12 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor);
 
 			// tell the launch coordinator to launch the new tasks
-			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor);
+			launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor);
+
+			return slotsPerWorker;
 		} catch (Exception ex) {
 			onFatalError(new ResourceManagerException("Unable to request new workers.", ex));
+			return Collections.emptyList();
 		}
 	}
 
@@ -691,36 +699,13 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 	/**
 	 * Creates a launchable task for Fenzo to process.
 	 */
-	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) {
-
-		// create the specific TM parameters from the resource profile and some defaults
-		MesosTaskManagerParameters params = new MesosTaskManagerParameters(
-			resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(),
-			taskManagerParameters.gpus(),
-			taskManagerParameters.containerType(),
-			taskManagerParameters.containerImageName(),
-			new ContaineredTaskManagerParameters(
-				ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(),
-				ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(),
-				ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(),
-				1,
-				new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
-			taskManagerParameters.containerVolumes(),
-			taskManagerParameters.dockerParameters(),
-			taskManagerParameters.dockerForcePullImage(),
-			taskManagerParameters.constraints(),
-			taskManagerParameters.command(),
-			taskManagerParameters.bootstrapCommand(),
-			taskManagerParameters.getTaskManagerHostname(),
-			taskManagerParameters.uris()
-		);
-
-		LOG.debug("LaunchableMesosWorker parameters: {}", params);
+	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
+		LOG.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters);
 
 		LaunchableMesosWorker launchable =
 			new LaunchableMesosWorker(
 				artifactServer,
-				params,
+				taskManagerParameters,
 				taskManagerContainerSpec,
 				taskID,
 				mesosConfig);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index a89b9f9..5b133e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -50,6 +50,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
 
+	/** ResourceProfile which matches any other ResourceProfile. */
+	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+
 	// ------------------------------------------------------------------------
 
 	/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index ac1181b..7929e31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -71,6 +71,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -1019,9 +1020,10 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * Allocates a resource using the resource profile.
 	 *
 	 * @param resourceProfile The resource description
+	 * @return Collection of {@link ResourceProfile} describing the launched slots
 	 */
 	@VisibleForTesting
-	public abstract void startNewWorker(ResourceProfile resourceProfile);
+	public abstract Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile);
 
 	/**
 	 * Callback when a worker was started.
@@ -1051,9 +1053,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 
 		@Override
-		public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
+		public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) {
 			validateRunsInMainThread();
-			startNewWorker(resourceProfile);
+			return startNewWorker(resourceProfile);
 		}
 
 		@Override
@@ -1176,8 +1178,16 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	//  Resource Management
 	// ------------------------------------------------------------------------
 
-	protected int getNumberPendingSlotRequests() {
-		return slotManager.getNumberPendingSlotRequests();
+	protected int getNumberRequiredTaskManagerSlots() {
+		return slotManager.getNumberPendingTaskManagerSlots();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Helper methods
+	// ------------------------------------------------------------------------
+
+	protected static Collection<ResourceProfile> createSlotsPerWorker(int numSlots) {
+		return Collections.nCopies(numSlots, ResourceProfile.ANY);
 	}
 }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 420b89f..064c2d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -32,6 +32,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /**
  * A standalone implementation of the resource manager. Used when the system is started in
  * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos.
@@ -74,7 +77,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
-	public void startNewWorker(ResourceProfile resourceProfile) {
+	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
+		return Collections.emptyList();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 17cf8c7..a8f212f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -25,10 +25,14 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.concurrent.CompletableFuture;
 
+/**
+ * Class representing a pending slot request in the {@link SlotManager}.
+ */
 public class PendingSlotRequest {
 
 	private final SlotRequest slotRequest;
@@ -36,11 +40,16 @@ public class PendingSlotRequest {
 	@Nullable
 	private CompletableFuture<Acknowledge> requestFuture;
 
+	@Nullable
+	private PendingTaskManagerSlot pendingTaskManagerSlot;
+
 	/** Timestamp when this pending slot request has been created. */
 	private final long creationTimestamp;
 
 	public PendingSlotRequest(SlotRequest slotRequest) {
 		this.slotRequest = Preconditions.checkNotNull(slotRequest);
+		this.requestFuture = null;
+		this.pendingTaskManagerSlot = null;
 		creationTimestamp = System.currentTimeMillis();
 	}
 
@@ -78,4 +87,18 @@ public class PendingSlotRequest {
 	public CompletableFuture<Acknowledge> getRequestFuture() {
 		return requestFuture;
 	}
+
+	@Nullable
+	public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() {
+		return pendingTaskManagerSlot;
+	}
+
+	public void assignPendingTaskManagerSlot(@Nonnull PendingTaskManagerSlot pendingTaskManagerSlotToAssign) {
+		Preconditions.checkState(pendingTaskManagerSlot == null);
+		this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign;
+	}
+
+	public void unassignPendingTaskManagerSlot() {
+		this.pendingTaskManagerSlot = null;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
new file mode 100644
index 0000000..ed207e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Represents a pending task manager slot in the {@link SlotManager}.
+ */
+public class PendingTaskManagerSlot {
+
+	private final TaskManagerSlotId taskManagerSlotId = TaskManagerSlotId.generate();
+
+	private final ResourceProfile resourceProfile;
+
+	@Nullable
+	private PendingSlotRequest pendingSlotRequest;
+
+	public PendingTaskManagerSlot(ResourceProfile resourceProfile) {
+		this.resourceProfile = resourceProfile;
+	}
+
+	public TaskManagerSlotId getTaskManagerSlotId() {
+		return taskManagerSlotId;
+	}
+
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	public void assignPendingSlotRequest(@Nonnull PendingSlotRequest pendingSlotRequestToAssign) {
+		Preconditions.checkState(pendingSlotRequest == null);
+		pendingSlotRequest = pendingSlotRequestToAssign;
+	}
+
+	public void unassignPendingSlotRequest() {
+		pendingSlotRequest = null;
+	}
+
+	@Nullable
+	public PendingSlotRequest getAssignedPendingSlotRequest() {
+		return pendingSlotRequest;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
index 84e7c4e..adf8f13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 
+import java.util.Collection;
+
 /**
  * Resource related actions which the {@link SlotManager} can perform.
  */
@@ -41,9 +43,10 @@ public interface ResourceActions {
 	 * Requests to allocate a resource with the given {@link ResourceProfile}.
 	 *
 	 * @param resourceProfile for the to be allocated resource
+	 * @return Collection of {@link ResourceProfile} describing the allocated slots
 	 * @throws ResourceManagerException if the resource cannot be allocated
 	 */
-	void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;
+	Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException;
 
 	/**
 	 * Notifies that an allocation failure has occurred.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index bab5660..2fa1844 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -43,14 +43,17 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -99,6 +102,8 @@ public class SlotManager implements AutoCloseable {
 	/** Map of pending/unfulfilled slot allocation requests. */
 	private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests;
 
+	private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots;
+
 	/** ResourceManager's id. */
 	private ResourceManagerId resourceManagerId;
 
@@ -130,6 +135,7 @@ public class SlotManager implements AutoCloseable {
 		taskManagerRegistrations = new HashMap<>(4);
 		fulfilledSlotRequests = new HashMap<>(16);
 		pendingSlotRequests = new HashMap<>(16);
+		pendingSlots = new HashMap<>(16);
 
 		resourceManagerId = null;
 		resourceActions = null;
@@ -168,8 +174,13 @@ public class SlotManager implements AutoCloseable {
 		}
 	}
 
-	public int getNumberPendingSlotRequests() {
-		return pendingSlotRequests.size();
+	public int getNumberPendingTaskManagerSlots() {
+		return pendingSlots.size();
+	}
+
+	@VisibleForTesting
+	int getNumberAssignedPendingTaskManagerSlots() {
+		return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
 	}
 
 	// ---------------------------------------------------------------------------------------------
@@ -530,14 +541,50 @@ public class SlotManager implements AutoCloseable {
 			removeSlot(slotId);
 		}
 
-		TaskManagerSlot slot = new TaskManagerSlot(
+		final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
+
+		final PendingTaskManagerSlot pendingTaskManagerSlot;
+
+		if (allocationId == null) {
+			pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
+		} else {
+			pendingTaskManagerSlot = null;
+		}
+
+		if (pendingTaskManagerSlot == null) {
+			updateSlot(slotId, allocationId, jobId);
+		} else {
+			pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
+			final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
+
+			if (assignedPendingSlotRequest == null) {
+				handleFreeSlot(slot);
+			} else {
+				assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
+				allocateSlot(slot, assignedPendingSlotRequest);
+			}
+		}
+	}
+
+	@Nonnull
+	private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) {
+		final TaskManagerSlot slot = new TaskManagerSlot(
 			slotId,
 			resourceProfile,
 			taskManagerConnection);
-
 		slots.put(slotId, slot);
+		return slot;
+	}
 
-		updateSlot(slotId, allocationId, jobId);
+	@Nullable
+	private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
+		for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
+			if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+				return pendingTaskManagerSlot;
+			}
+		}
+
+		return null;
 	}
 
 	/**
@@ -650,15 +697,56 @@ public class SlotManager implements AutoCloseable {
 	 * @throws ResourceManagerException if the resource manager cannot allocate more resource
 	 */
 	private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
-		TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile());
+		final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
+		TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile);
 
 		if (taskManagerSlot != null) {
 			allocateSlot(taskManagerSlot, pendingSlotRequest);
 		} else {
-			resourceActions.allocateResource(pendingSlotRequest.getResourceProfile());
+			Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
+
+			if (!pendingTaskManagerSlotOptional.isPresent()) {
+				pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
+			}
+
+			pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
+		}
+	}
+
+	private Optional<PendingTaskManagerSlot> findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) {
+		for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
+			if (pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null && pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile)) {
+				return Optional.of(pendingTaskManagerSlot);
+			}
+		}
+
+		return Optional.empty();
+	}
+
+	private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
+		final Collection<ResourceProfile> requestedSlots = resourceActions.allocateResource(resourceProfile);
+
+		if (requestedSlots.isEmpty()) {
+			return Optional.empty();
+		} else {
+			final Iterator<ResourceProfile> slotIterator = requestedSlots.iterator();
+			final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next());
+			pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
+
+			while (slotIterator.hasNext()) {
+				final PendingTaskManagerSlot additionalPendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next());
+				pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(), additionalPendingTaskManagerSlot);
+			}
+
+			return Optional.of(pendingTaskManagerSlot);
 		}
 	}
 
+	private void assignPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) {
+		pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
+		pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot);
+	}
+
 	/**
 	 * Allocates the given slot for the given slot request. This entails sending a registration
 	 * message to the task manager and treating failures.
@@ -680,6 +768,8 @@ public class SlotManager implements AutoCloseable {
 		taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
 		pendingSlotRequest.setRequestFuture(completableFuture);
 
+		returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
+
 		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
 
 		if (taskManagerRegistration == null) {
@@ -733,6 +823,14 @@ public class SlotManager implements AutoCloseable {
 			mainThreadExecutor);
 	}
 
+	private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest pendingSlotRequest) {
+		final PendingTaskManagerSlot pendingTaskManagerSlot = pendingSlotRequest.getAssignedPendingTaskManagerSlot();
+		if (pendingTaskManagerSlot != null) {
+			pendingTaskManagerSlot.unassignPendingSlotRequest();
+			pendingSlotRequest.unassignPendingTaskManagerSlot();
+		}
+	}
+
 	/**
 	 * Handles a free slot. It first tries to find a pending slot request which can be fulfilled.
 	 * If there is no such request, then it will add the slot to the set of free slots.
@@ -886,6 +984,8 @@ public class SlotManager implements AutoCloseable {
 	private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
 		CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture();
 
+		returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
+
 		if (null != request) {
 			request.cancel(false);
 		}
@@ -911,9 +1011,10 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			// second we trigger the release resource callback which can decide upon the resource release
+			final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout.");
 			for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) {
 				LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId);
-				resourceActions.releaseResource(timedOutTaskManagerId, new FlinkException("TaskExecutor exceeded the idle timeout."));
+				resourceActions.releaseResource(timedOutTaskManagerId, cause);
 			}
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
new file mode 100644
index 0000000..3084b3e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Id of {@link TaskManagerSlot} and {@link PendingTaskManagerSlot}.
+ */
+public class TaskManagerSlotId extends AbstractID {
+
+	private static final long serialVersionUID = -4024240625523472071L;
+
+	private TaskManagerSlotId() {}
+
+	public static TaskManagerSlotId generate() {
+		return new TaskManagerSlotId();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 0e98e44..c46d800 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -246,7 +246,7 @@ public class TaskManagerServices {
 		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
 
 		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
-			resourceProfiles.add(new ResourceProfile(1.0, 42));
+			resourceProfiles.add(ResourceProfile.ANY);
 		}
 
 		final TimerService<AllocationID> timerService = new TimerService<>(
@@ -259,7 +259,6 @@ public class TaskManagerServices {
 
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 
-
 		final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
 
 		final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 0b56231..e820701 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -32,6 +32,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /**
  * Simple {@link ResourceManager} implementation for testing purposes.
  */
@@ -71,8 +74,8 @@ public class TestingResourceManager extends ResourceManager<ResourceID> {
 	}
 
 	@Override
-	public void startNewWorker(ResourceProfile resourceProfile) {
-		// noop
+	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
+		return Collections.emptyList();
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 8a7f733..854d27c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -54,7 +55,9 @@ import org.mockito.ArgumentCaptor;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +70,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -202,13 +206,16 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			"localhost");
 
-		ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		CompletableFuture<ResourceProfile> allocateResourceFuture = new CompletableFuture<>();
+		ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(allocateResourceFuture::complete)
+			.build();
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			slotManager.registerSlotRequest(slotRequest);
 
-			verify(resourceManagerActions).allocateResource(eq(resourceProfile));
+			assertThat(allocateResourceFuture.get(), is(equalTo(resourceProfile)));
 		}
 	}
 
@@ -357,7 +364,10 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			targetAddress);
 
-		ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final AtomicInteger numberAllocateResourceCalls = new AtomicInteger(0);
+		ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet())
+			.build();
 
 		// accept an incoming slot request
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -378,7 +388,7 @@ public class SlotManagerTest extends TestLogger {
 
 			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
 
-			verify(resourceManagerActions, times(1)).allocateResource(eq(resourceProfile));
+			assertThat(numberAllocateResourceCalls.get(), is(1));
 
 			slotManager.registerTaskManager(
 				taskExecutorConnection,
@@ -444,7 +454,10 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testDuplicatePendingSlotRequest() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final AtomicInteger numberAllocateResourceFunctionCalls = new AtomicInteger(0);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(resourceProfile -> numberAllocateResourceFunctionCalls.incrementAndGet())
+			.build();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
@@ -458,7 +471,7 @@ public class SlotManagerTest extends TestLogger {
 
 		// check that we have only called the resource allocation only for the first slot request,
 		// since the second request is a duplicate
-		verify(resourceManagerActions, times(1)).allocateResource(any(ResourceProfile.class));
+		assertThat(numberAllocateResourceFunctionCalls.get(), is(1));
 	}
 
 	/**
@@ -723,7 +736,10 @@ public class SlotManagerTest extends TestLogger {
 	public void testSlotRequestTimeout() throws Exception {
 		final long allocationTimeout = 50L;
 
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final CompletableFuture<Tuple2<JobID, AllocationID>> failedAllocationFuture = new CompletableFuture<>();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setNotifyAllocationFailureConsumer(tuple3 -> failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1)))
+			.build();
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -743,21 +759,15 @@ public class SlotManagerTest extends TestLogger {
 
 			final AtomicReference<Exception> atomicException = new AtomicReference<>(null);
 
-			mainThreadExecutor.execute(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						assertTrue(slotManager.registerSlotRequest(slotRequest));
-					} catch (Exception e) {
-						atomicException.compareAndSet(null, e);
-					}
+			mainThreadExecutor.execute(() -> {
+				try {
+					assertTrue(slotManager.registerSlotRequest(slotRequest));
+				} catch (Exception e) {
+					atomicException.compareAndSet(null, e);
 				}
 			});
 
-			verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure(
-				eq(jobId),
-				eq(allocationId),
-				any(TimeoutException.class));
+			assertThat(failedAllocationFuture.get(), is(equalTo(Tuple2.of(jobId, allocationId))));
 
 			if (atomicException.get() != null) {
 				throw atomicException.get();
@@ -851,7 +861,7 @@ public class SlotManagerTest extends TestLogger {
 	public void testSlotReportWhileActiveSlotRequest() throws Exception {
 		final long verifyTimeout = 10000L;
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -969,7 +979,10 @@ public class SlotManagerTest extends TestLogger {
 		final long verifyTimeout = taskManagerTimeout * 10L;
 
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID))
+			.build();
 		final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
 
 		final ResourceID resourceId = ResourceID.generate();
@@ -1052,7 +1065,7 @@ public class SlotManagerTest extends TestLogger {
 
 			assertTrue(idleFuture2.get());
 
-			verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class));
+			assertThat(releasedResourceFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
 		}
 	}
 
@@ -1109,7 +1122,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testReportAllocatedSlot() throws Exception {
 		final ResourceID taskManagerId = ResourceID.generate();
-		final ResourceActions resourceActions = mock(ResourceActions.class);
+		final ResourceActions resourceActions = new TestingResourceActionsBuilder().build();
 		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
 
@@ -1167,7 +1180,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testSlotRequestFailure() throws Exception {
 		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
-			new TestingResourceActionsBuilder().createTestingResourceActions())) {
+			new TestingResourceActionsBuilder().build())) {
 
 			final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
 			slotManager.registerSlotRequest(slotRequest);
@@ -1222,7 +1235,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testSlotRequestRemovedIfTMReportAllocation() throws Exception {
 		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(),
-				new TestingResourceActionsBuilder().createTestingResourceActions())) {
+				new TestingResourceActionsBuilder().build())) {
 
 			final JobID jobID = new JobID();
 			final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar");
@@ -1293,7 +1306,7 @@ public class SlotManagerTest extends TestLogger {
 			.setNotifyAllocationFailureConsumer(
 				(Tuple3<JobID, AllocationID, Exception> failureMessage) ->
 					allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1)))
-			.createTestingResourceActions();
+			.build();
 
 		try (final SlotManager slotManager = createSlotManager(
 			ResourceManagerId.generate(),
@@ -1374,17 +1387,27 @@ public class SlotManagerTest extends TestLogger {
 
 	@Nonnull
 	private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots) {
+		return createSlotReport(taskExecutorResourceId, numberSlots, ResourceProfile.UNKNOWN);
+	}
+
+	@Nonnull
+	private SlotReport createSlotReport(ResourceID taskExecutorResourceId, int numberSlots, ResourceProfile resourceProfile) {
 		final Set<SlotStatus> slotStatusSet = new HashSet<>(numberSlots);
 		for (int i = 0; i < numberSlots; i++) {
-			slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN));
+			slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), resourceProfile));
 		}
 
 		return new SlotReport(slotStatusSet);
 	}
 
 	@Nonnull
-	private SlotRequest createSlotRequest(JobID jobId1) {
-		return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1");
+	private SlotRequest createSlotRequest(JobID jobId) {
+		return createSlotRequest(jobId, ResourceProfile.UNKNOWN);
+	}
+
+	@Nonnull
+	private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfile) {
+		return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1");
 	}
 
 	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) {
@@ -1398,4 +1421,171 @@ public class SlotManagerTest extends TestLogger {
 
 		return slotManager;
 	}
+
+	/**
+	 * Tests that we only request new resources/containers once we have assigned
+	 * all pending task manager slots.
+	 */
+	@Test
+	public void testRequestNewResources() throws Exception {
+		final int numberSlots = 2;
+		final AtomicInteger resourceRequests = new AtomicInteger(0);
+		final TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(
+				convert(ignored -> {
+					resourceRequests.incrementAndGet();
+					return numberSlots;
+				}))
+			.build();
+
+		try (final SlotManager slotManager = createSlotManager(
+			ResourceManagerId.generate(),
+			testingResourceActions)) {
+
+			final JobID jobId = new JobID();
+			assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+			assertThat(resourceRequests.get(), is(1));
+
+			// the second slot request should not try to allocate a new resource because the
+			// previous resource was started with 2 slots.
+			assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+			assertThat(resourceRequests.get(), is(1));
+
+			assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2));
+
+			assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+			assertThat(resourceRequests.get(), is(2));
+		}
+	}
+
+	/**
+	 * Tests that a failing allocation/slot request will return the pending task manager slot.
+	 */
+	@Test
+	public void testFailingAllocationReturnsPendingTaskManagerSlot() throws Exception {
+		final int numberSlots = 2;
+		final TestingResourceActions resourceActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(convert(value -> numberSlots))
+			.build();
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+			final JobID jobId = new JobID();
+
+			final SlotRequest slotRequest = createSlotRequest(jobId);
+			assertThat(slotManager.registerSlotRequest(slotRequest), is(true));
+
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+			assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+
+			slotManager.unregisterSlotRequest(slotRequest.getAllocationId());
+
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+			assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
+		}
+	}
+
+	/**
+	 * Tests the completion of pending task manager slots by registering a TaskExecutor.
+	 */
+	@Test
+	public void testPendingTaskManagerSlotCompletion() throws Exception {
+		final int numberSlots = 3;
+		final TestingResourceActions resourceActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(convert(value -> numberSlots))
+			.build();
+
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+			final JobID jobId = new JobID();
+			assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+			assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+			assertThat(slotManager.getNumberRegisteredSlots(), is(0));
+
+			final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
+			final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1);
+
+			slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1));
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1));
+		}
+	}
+
+	private TaskExecutorConnection createTaskExecutorConnection() {
+		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+		return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway);
+	}
+
+	/**
+	 * Tests that a different slot can fulfill a pending slot request. If the
+	 * pending slot request has a pending task manager slot assigned, it should
+	 * be freed.
+	 */
+	@Test
+	public void testRegistrationOfDifferentSlot() throws Exception {
+		final int numberSlots = 1;
+		final TestingResourceActions resourceActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(convert(value -> numberSlots))
+			.build();
+
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+			final JobID jobId = new JobID();
+			final ResourceProfile requestedSlotProfile = new ResourceProfile(1.0, 1);
+
+			assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, requestedSlotProfile)), is(true));
+
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+
+			final int numberOfferedSlots = 1;
+			final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
+			final ResourceProfile offeredSlotProfile = new ResourceProfile(2.0, 2);
+			final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberOfferedSlots, offeredSlotProfile);
+
+			slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(numberOfferedSlots));
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+			assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0));
+		}
+	}
+
+	/**
+	 * Tests that only free slots can fulfill/complete a pending task manager slot.
+	 */
+	@Test
+	public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws Exception {
+		final int numberSlots = 1;
+		final TestingResourceActions resourceActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(convert(value -> numberSlots))
+			.build();
+
+		try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) {
+			final JobID jobId = new JobID();
+			assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true));
+
+			final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection();
+			final SlotID slotId = new SlotID(taskExecutorConnection.getResourceID(), 0);
+			final SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN, jobId, new AllocationID());
+			final SlotReport slotReport = new SlotReport(slotStatus);
+
+			slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+			assertThat(slotManager.getNumberRegisteredSlots(), is(1));
+			assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots));
+			assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1));
+		}
+	}
+
+	private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) {
+		return (ResourceProfile resourceProfile) -> {
+			final int slots = function.apply(resourceProfile);
+
+			final ArrayList<ResourceProfile> result = new ArrayList<>(slots);
+			for (int i = 0; i < slots; i++) {
+				result.add(resourceProfile);
+			}
+
+			return result;
+		};
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 8f6317c..51e6b0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -47,6 +47,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -87,7 +90,10 @@ public class SlotProtocolTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			ResourceActions resourceManagerActions = mock(ResourceActions.class);
+			final CompletableFuture<ResourceProfile> resourceProfileFuture = new CompletableFuture<>();
+			ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+				.setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile))
+				.build();
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 
@@ -99,7 +105,7 @@ public class SlotProtocolTest extends TestLogger {
 
 			slotManager.registerSlotRequest(slotRequest);
 
-			verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile()));
+			assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile())));
 
 			// slot becomes available
 			TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
index 8b7c802..4c6f14c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java
@@ -23,9 +23,12 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nonnull;
 
+import java.util.Collection;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -38,29 +41,28 @@ public class TestingResourceActions implements ResourceActions {
 	private final BiConsumer<InstanceID, Exception> releaseResourceConsumer;
 
 	@Nonnull
-	private final Consumer<ResourceProfile> allocateResourceConsumer;
+	private final FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction;
 
 	@Nonnull
 	private final Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer;
 
 	public TestingResourceActions(
 			@Nonnull BiConsumer<InstanceID, Exception> releaseResourceConsumer,
-			@Nonnull Consumer<ResourceProfile> allocateResourceConsumer,
+			@Nonnull FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction,
 			@Nonnull Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer) {
 		this.releaseResourceConsumer = releaseResourceConsumer;
-		this.allocateResourceConsumer = allocateResourceConsumer;
+		this.allocateResourceFunction = allocateResourceFunction;
 		this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer;
 	}
 
-
 	@Override
 	public void releaseResource(InstanceID instanceId, Exception cause) {
 		releaseResourceConsumer.accept(instanceId, cause);
 	}
 
 	@Override
-	public void allocateResource(ResourceProfile resourceProfile) {
-		allocateResourceConsumer.accept(resourceProfile);
+	public Collection<ResourceProfile> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
+		return allocateResourceFunction.apply(resourceProfile);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
index 2c1d47e..ac7afd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java
@@ -23,7 +23,11 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.util.function.FunctionWithException;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -32,7 +36,7 @@ import java.util.function.Consumer;
  */
 public class TestingResourceActionsBuilder {
 	private BiConsumer<InstanceID, Exception> releaseResourceConsumer = (ignoredA, ignoredB) -> {};
-	private Consumer<ResourceProfile> allocateResourceConsumer = (ignored) -> {};
+	private FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction = (ignored) -> Collections.singleton(ResourceProfile.UNKNOWN);
 	private Consumer<Tuple3<JobID, AllocationID, Exception>> notifyAllocationFailureConsumer = (ignored) -> {};
 
 	public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer<InstanceID, Exception> releaseResourceConsumer) {
@@ -40,8 +44,16 @@ public class TestingResourceActionsBuilder {
 		return this;
 	}
 
+	public TestingResourceActionsBuilder setAllocateResourceFunction(FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> allocateResourceFunction) {
+		this.allocateResourceFunction = allocateResourceFunction;
+		return this;
+	}
+
 	public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer<ResourceProfile> allocateResourceConsumer) {
-		this.allocateResourceConsumer = allocateResourceConsumer;
+		this.allocateResourceFunction = (ResourceProfile resourceProfile) -> {
+			allocateResourceConsumer.accept(resourceProfile);
+			return Collections.singleton(ResourceProfile.UNKNOWN);
+		};
 		return this;
 	}
 
@@ -50,7 +62,7 @@ public class TestingResourceActionsBuilder {
 		return this;
 	}
 
-	public TestingResourceActions createTestingResourceActions() {
-		return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer);
+	public TestingResourceActions build() {
+		return new TestingResourceActions(releaseResourceConsumer, allocateResourceFunction, notifyAllocationFailureConsumer);
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 956e40f..2a43b8b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import javax.annotation.Nullable;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -115,6 +116,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
 
+	private final Collection<ResourceProfile> slotsPerWorker;
+
 	public YarnResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -163,6 +166,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 		this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
+
+		this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(
@@ -283,7 +288,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	}
 
 	@Override
-	public void startNewWorker(ResourceProfile resourceProfile) {
+	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
 		// Priority for worker containers - priorities are intra-application
 		//TODO: set priority according to the resource allocated
 		Priority priority = Priority.newInstance(generatePriority(resourceProfile));
@@ -291,6 +296,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores();
 		Resource capability = Resource.newInstance(mem, vcore);
 		requestYarnContainer(capability, priority);
+
+		return slotsPerWorker;
 	}
 
 	@Override
@@ -334,7 +341,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
 						final Container container = yarnWorkerNode.getContainer();
-						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+						requestYarnContainerIfRequired(container.getResource(), yarnWorkerNode.getContainer().getPriority());
 					}
 					// Eagerly close the connection with task manager.
 					closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
@@ -375,7 +382,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 						workerNodeMap.remove(resourceId);
 						resourceManagerClient.releaseAssignedContainer(container.getId());
 						// and ask for a new one
-						requestYarnContainer(container.getResource(), container.getPriority());
+						requestYarnContainerIfRequired(container.getResource(), container.getPriority());
 					}
 				} else {
 					// return the excessive containers
@@ -446,21 +453,26 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 	/**
 	 * Request new container if pending containers cannot satisfies pending slot requests.
 	 */
+	private void requestYarnContainerIfRequired(Resource resource, Priority priority) {
+		int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots();
+		int pendingTaskManagerSlots = numPendingContainerRequests * numberOfTaskSlots;
+
+		if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+			requestYarnContainer(resource, priority);
+		}
+	}
+
 	private void requestYarnContainer(Resource resource, Priority priority) {
-		int pendingSlotRequests = getNumberPendingSlotRequests();
-		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
-		if (pendingSlotRequests > pendingSlotAllocation) {
-			resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+		resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
 
-			// make sure we transmit the request fast and receive fast news of granted allocations
-			resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+		// make sure we transmit the request fast and receive fast news of granted allocations
+		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-			numPendingContainerRequests++;
+		numPendingContainerRequests++;
 
-			log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
-				resource,
-				numPendingContainerRequests);
-		}
+		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+			resource,
+			numPendingContainerRequests);
 	}
 
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)


[flink] 05/05: [hotfix] Start MesosWorkers with default ContaineredTaskManagerConfiguration

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6639901e21ddae7a64d9ea3208d980424bd0ebf
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 14:13:42 2018 +0200

    [hotfix] Start MesosWorkers with default ContaineredTaskManagerConfiguration
---
 .../flink/mesos/runtime/clusterframework/MesosResourceManager.java    | 4 ++--
 .../mesos/runtime/clusterframework/MesosResourceManagerTest.java      | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e7a5c98..d826773 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -357,7 +357,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 				switch(worker.state()) {
 					case Launched:
 						workersInLaunch.put(extractResourceID(worker.taskID()), worker);
-						final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile());
+						final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
 						toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
 						break;
 					case Released:
@@ -439,7 +439,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			workerStore.putWorker(worker);
 			workersInNew.put(extractResourceID(worker.taskID()), worker);
 
-			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile);
+			LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
 
 			LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).",
 				launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e21f0fc..5163724 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -242,7 +242,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		TestingMesosResourceManager resourceManager;
 
 		// domain objects for test purposes
-		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1);
+		final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN;
 
 		Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build();
 		public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();


[flink] 04/05: [hotfix] Remove mocking from SlotProtocolTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0ba980caed9f26095bc8134bbe581635bb98dde
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 17:24:49 2018 +0200

    [hotfix] Remove mocking from SlotProtocolTest
---
 .../slotmanager/SlotProtocolTest.java              | 46 +++++++++++-----------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 51e6b0b..66966cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -33,13 +33,13 @@ import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnect
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
@@ -50,12 +50,10 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for the slot allocation protocol.
+ */
 public class SlotProtocolTest extends TestLogger {
 
 	private static final long timeout = 10000L;
@@ -92,7 +90,7 @@ public class SlotProtocolTest extends TestLogger {
 
 			final CompletableFuture<ResourceProfile> resourceProfileFuture = new CompletableFuture<>();
 			ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-				.setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile))
+				.setAllocateResourceConsumer(resourceProfileFuture::complete)
 				.build();
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
@@ -108,11 +106,13 @@ public class SlotProtocolTest extends TestLogger {
 			assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile())));
 
 			// slot becomes available
-			TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-			Mockito.when(
-				taskExecutorGateway
-					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
-				.thenReturn(mock(CompletableFuture.class));
+			final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>();
+			TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setRequestSlotFunction(tuple5 -> {
+					requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+					return new CompletableFuture<>();
+				})
+				.createTestingTaskExecutorGateway();
 
 			final ResourceID resourceID = ResourceID.generate();
 			final SlotID slotID = new SlotID(resourceID, 0);
@@ -125,8 +125,7 @@ public class SlotProtocolTest extends TestLogger {
 			slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
 			// 4) Slot becomes available and TaskExecutor gets a SlotRequest
-			verify(taskExecutorGateway, timeout(5000L))
-				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
 		}
 	}
 
@@ -143,11 +142,13 @@ public class SlotProtocolTest extends TestLogger {
 
 		final ResourceManagerId rmLeaderID = ResourceManagerId.generate();
 
-		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		Mockito.when(
-			taskExecutorGateway
-				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
-			.thenReturn(mock(CompletableFuture.class));
+		final CompletableFuture<Tuple3<SlotID, JobID, AllocationID>> requestFuture = new CompletableFuture<>();
+		TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2));
+				return new CompletableFuture<>();
+			})
+			.createTestingTaskExecutorGateway();
 
 		try (SlotManager slotManager = new SlotManager(
 			scheduledExecutor,
@@ -155,7 +156,7 @@ public class SlotProtocolTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			ResourceActions resourceManagerActions = mock(ResourceActions.class);
+			ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 
@@ -178,8 +179,7 @@ public class SlotProtocolTest extends TestLogger {
 			slotManager.registerSlotRequest(slotRequest);
 
 			// a SlotRequest is routed to the TaskExecutor
-			verify(taskExecutorGateway, timeout(5000))
-				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID))));
 		}
 	}
 }


[flink] 03/05: [hotfix] Remove mocking from SlotManagerTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c95396c05839447a75af6020896ed4733d1c5a7
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 17:16:32 2018 +0200

    [hotfix] Remove mocking from SlotManagerTest
---
 .../slotmanager/SlotManagerTest.java               | 160 +++++++++------------
 1 file changed, 64 insertions(+), 96 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 854d27c..33a696a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -88,9 +88,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -107,9 +105,9 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testTaskManagerRegistration() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final ResourceID resourceId = ResourceID.generate();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -139,14 +137,12 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final JobID jobId = new JobID();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(new CompletableFuture<>());
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				assertThat(tuple5.f4, is(equalTo(resourceManagerId)));
+				return new CompletableFuture<>();
+			})
+			.createTestingTaskExecutorGateway();
 
 		final ResourceID resourceId = ResourceID.generate();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -232,8 +228,11 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			"localhost");
 
-		ResourceActions resourceManagerActions = mock(ResourceActions.class);
-		doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+		ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction(value -> {
+				throw new ResourceManagerException("Test exception");
+			})
+			.build();
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
@@ -264,19 +263,17 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			targetAddress);
 
-		ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
-
+			final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
 			// accept an incoming slot request
-			final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-			when(taskExecutorGateway.requestSlot(
-				eq(slotId),
-				eq(jobId),
-				eq(allocationId),
-				anyString(),
-				eq(resourceManagerId),
-				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+			final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setRequestSlotFunction(tuple5 -> {
+					requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
+				.createTestingTaskExecutorGateway();
 
 			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -289,7 +286,7 @@ public class SlotManagerTest extends TestLogger {
 
 			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
 
-			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId))));
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -309,14 +306,9 @@ public class SlotManagerTest extends TestLogger {
 		final SlotID slotId = new SlotID(resourceID, 0);
 		final AllocationID allocationId = new AllocationID();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(new CompletableFuture<>());
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>())
+			.createTestingTaskExecutorGateway();
 
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -369,15 +361,14 @@ public class SlotManagerTest extends TestLogger {
 			.setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet())
 			.build();
 
+		final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
 		// accept an incoming slot request
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			eq(slotId),
-			eq(jobId),
-			eq(allocationId),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.createTestingTaskExecutorGateway();
 
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -394,7 +385,7 @@ public class SlotManagerTest extends TestLogger {
 				taskExecutorConnection,
 				slotReport);
 
-			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
+			assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId))));
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -510,21 +501,17 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
+			.build();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
 		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
 		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
 		final ResourceID resourceID = ResourceID.generate();
 
@@ -547,7 +534,7 @@ public class SlotManagerTest extends TestLogger {
 
 		// check that we have only called the resource allocation only for the first slot request,
 		// since the second request is a duplicate
-		verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
+		assertThat(allocateResourceCalls.get(), is(0));
 	}
 
 	/**
@@ -557,21 +544,17 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final AtomicInteger allocateResourceCalls = new AtomicInteger(0);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet())
+			.build();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
 		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
 		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
 		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			any(JobID.class),
-			any(AllocationID.class),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
 		final ResourceID resourceID = ResourceID.generate();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -601,7 +584,7 @@ public class SlotManagerTest extends TestLogger {
 
 		// check that we have only called the resource allocation only for the first slot request,
 		// since the second request is a duplicate
-		verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
+		assertThat(allocateResourceCalls.get(), is(0));
 	}
 
 	/**
@@ -637,7 +620,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testUpdateSlotReport() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build();
 
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -691,13 +674,16 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerTimeout() throws Exception {
-		final long tmTimeout = 500L;
+		final long tmTimeout = 10L;
 
-		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
+		final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID))
+			.build();
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotID slotId = new SlotID(resourceID, 0);
@@ -715,15 +701,9 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
 
-			mainThreadExecutor.execute(new Runnable() {
-				@Override
-				public void run() {
-					slotManager.registerTaskManager(taskManagerConnection, slotReport);
-				}
-			});
+			mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
-			verify(resourceManagerActions, timeout(100L * tmTimeout).times(1))
-				.releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class));
+			assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID())));
 		}
 	}
 
@@ -976,13 +956,12 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	public void testTimeoutForUnusedTaskManager() throws Exception {
 		final long taskManagerTimeout = 50L;
-		final long verifyTimeout = taskManagerTimeout * 10L;
 
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>();
 		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
 			.setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID))
 			.build();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
 
 		final ResourceID resourceId = ResourceID.generate();
@@ -992,14 +971,13 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		when(taskExecutorGateway.requestSlot(
-			any(SlotID.class),
-			eq(jobId),
-			eq(allocationId),
-			anyString(),
-			eq(resourceManagerId),
-			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		final CompletableFuture<SlotID> requestedSlotFuture = new CompletableFuture<>();
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+			.setRequestSlotFunction(tuple5 -> {
+				requestedSlotFuture.complete(tuple5.f0);
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			})
+			.createTestingTaskExecutorGateway();
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -1028,17 +1006,9 @@ public class SlotManagerTest extends TestLogger {
 					}
 				},
 				mainThreadExecutor)
-			.thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
-
-			ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
+			.thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
 
-			verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot(
-				slotIdArgumentCaptor.capture(),
-				eq(jobId),
-				eq(allocationId),
-				anyString(),
-				eq(resourceManagerId),
-				any(Time.class));
+			final SlotID slotId = requestedSlotFuture.get();
 
 			CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
 				() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
@@ -1047,8 +1017,6 @@ public class SlotManagerTest extends TestLogger {
 			// check that the TaskManager is not idle
 			assertFalse(idleFuture.get());
 
-			final SlotID slotId = slotIdArgumentCaptor.getValue();
-
 			CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
 				() -> slotManager.getSlot(slotId),
 				mainThreadExecutor);


[flink] 02/05: [hotfix] Cancel actual pending slot request in SlotManager#updateSlotState

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb50658c27ebbd14f9d240165e6b47f5229dddd4
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Sep 21 15:53:52 2018 +0200

    [hotfix] Cancel actual pending slot request in SlotManager#updateSlotState
---
 .../flink/runtime/resourcemanager/slotmanager/SlotManager.java      | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 2fa1844..2ef2b2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -642,7 +642,11 @@ public class SlotManager implements AutoCloseable {
 						slot.updateAllocation(allocationId, jobId);
 
 						// remove the pending request if any as it has been assigned
-						pendingSlotRequests.remove(allocationId);
+						final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId);
+
+						if (actualPendingSlotRequest != null) {
+							cancelPendingSlotRequest(actualPendingSlotRequest);
+						}
 
 						// this will try to find a new slot for the request
 						rejectPendingSlotRequest(