You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/12 19:05:27 UTC

[flink] branch master updated (6cf98b6 -> d56c45a)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 6cf98b6  [FLINK-12804] Change mailbox implementation from bounded to unbounded
     new 0421594  [FLINK-12766][runtime] Introduce merge and subtract calculations and tests for ResourceProfile.
     new f6857a4  [FLINK-12765][coordinator] Change the default resource spec to UNKNOWN
     new c5072df  [hotfix] [tests] Checkstyle fixes and minor code cleanup in ResourceProfileTest and ResourceSpecTest
     new 2947015  [FLINK-12766][runtime] Fix bug in merging and converting UNKNOWN ResourceSpecs.
     new e6ef677  [hotfix][runtime] Preserve singleton property of UNKNOWN ResourceSpec and ResourceProfile
     new 5654e0f  [hotfix][core] Minor cleanups to the ResourceSpec class
     new faa839d  [FLINK-12765][coordinator] Disable jobs where some, but nor all, vertices have configured resource profiles
     new bcec7d7  [FLINK-12765][jobmanager] Keep track of the resources used in the task slot hierarchy
     new a587576  [FLINK-12765][jobmanager] Let some slot reqests fail if the sharing slot is oversubscribed
     new d56c45a  [FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/api/common/operators/ResourceSpec.java   |  57 +++-
 .../flink/api/common/resources/Resource.java       |  35 ++-
 .../api/common/operators/ResourceSpecTest.java     |  73 +++++-
 .../clusterframework/types/ResourceProfile.java    | 142 +++++++++-
 .../flink/runtime/dispatcher/Dispatcher.java       |  25 ++
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  |  54 +++-
 ...java => SharedSlotOversubscribedException.java} |  32 +--
 .../jobmaster/slotpool/SlotSharingManager.java     | 138 +++++++++-
 .../types/ResourceProfileTest.java                 | 125 ++++++++-
 .../flink/runtime/dispatcher/DispatcherTest.java   |  40 +++
 .../jobmaster/slotpool/SlotPoolCoLocationTest.java | 162 ++++++++++++
 .../slotpool/SlotPoolSlotSharingTest.java          |  92 +++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 291 +++++++++++++++++++++
 .../apache/flink/table/executor/BatchExecutor.java |   4 +-
 14 files changed, 1206 insertions(+), 64 deletions(-)
 mode change 100644 => 100755 flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 mode change 100644 => 100755 flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
 mode change 100644 => 100755 flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/{AllocatedSlotActions.java => SharedSlotOversubscribedException.java} (60%)
 mode change 100644 => 100755 flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
 mode change 100644 => 100755 flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java


[flink] 06/10: [hotfix][core] Minor cleanups to the ResourceSpec class

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5654e0f4750cd9fbe7cba2bdebd16ea3cb4011d6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Jul 11 17:40:41 2019 +0200

    [hotfix][core] Minor cleanups to the ResourceSpec class
    
      - Some JavaDoc comments
      - Make the class final, because several methods are not designed to handle inheritence well.
      - Avoid repeated string concatenation/building
---
 .../apache/flink/api/common/operators/ResourceSpec.java   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index dc6a611..2d56746 100755
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -50,12 +50,19 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * </ol>
  */
 @Internal
-public class ResourceSpec implements Serializable {
+public final class ResourceSpec implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
+	/**
+	 * A ResourceSpec that indicates an unknown set of resources.
+	 */
 	public static final ResourceSpec UNKNOWN = new ResourceSpec();
 
+	/**
+	 * The default ResourceSpec used for operators and transformation functions.
+	 * Currently equal to {@link #UNKNOWN}.
+	 */
 	public static final ResourceSpec DEFAULT = UNKNOWN;
 
 	public static final String GPU_NAME = "GPU";
@@ -91,7 +98,7 @@ public class ResourceSpec implements Serializable {
 	 * @param managedMemoryInMB The size of managed memory, in megabytes.
 	 * @param extendedResources The extended resources, associated with the resource manager used
 	 */
-	protected ResourceSpec(
+	private ResourceSpec(
 			double cpuCores,
 			int heapMemoryInMB,
 			int directMemoryInMB,
@@ -273,9 +280,9 @@ public class ResourceSpec implements Serializable {
 
 	@Override
 	public String toString() {
-		String extend = "";
+		StringBuilder extend = new StringBuilder();
 		for (Resource resource : extendedResources.values()) {
-			extend += ", " + resource.getName() + "=" + resource.getValue();
+			extend.append(", ").append(resource.getName()).append("=").append(resource.getValue());
 		}
 		return "ResourceSpec{" +
 				"cpuCores=" + cpuCores +


[flink] 07/10: [FLINK-12765][coordinator] Disable jobs where some, but nor all, vertices have configured resource profiles

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faa839d4acde330831930a6fc23713f1d18a53e5
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Tue Jun 25 23:37:05 2019 +0800

    [FLINK-12765][coordinator] Disable jobs where some, but nor all, vertices have configured resource profiles
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 25 ++++++++++++++
 .../flink/runtime/dispatcher/DispatcherTest.java   | 40 ++++++++++++++++++++++
 2 files changed, 65 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 6f8f27d..4909885 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
@@ -265,6 +267,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			if (isDuplicateJob(jobGraph.getJobID())) {
 				return FutureUtils.completedExceptionally(
 					new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
+			} else if (isPartialResourceConfigured(jobGraph)) {
+				return FutureUtils.completedExceptionally(
+					new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
+							"resources configured. The limitation will be removed in future versions."));
 			} else {
 				return internalSubmitJob(jobGraph);
 			}
@@ -292,6 +298,25 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		return jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || jobManagerRunnerFutures.containsKey(jobId);
 	}
 
+	private boolean isPartialResourceConfigured(JobGraph jobGraph) {
+		boolean hasVerticesWithUnknownResource = false;
+		boolean hasVerticesWithConfiguredResource = false;
+
+		for (JobVertex jobVertex : jobGraph.getVertices()) {
+			if (jobVertex.getMinResources() == ResourceSpec.UNKNOWN) {
+				hasVerticesWithUnknownResource = true;
+			} else {
+				hasVerticesWithConfiguredResource = true;
+			}
+
+			if (hasVerticesWithUnknownResource && hasVerticesWithConfiguredResource) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+
 	private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
 		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
old mode 100644
new mode 100755
index 554e7a0..714c0b2
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -270,6 +272,44 @@ public class DispatcherTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that we can submit a job to the Dispatcher which then spawns a
+	 * new JobManagerRunner.
+	 */
+	@Test
+	public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
+		dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch));
+
+		CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
+
+		// wait for the leader to be elected
+		leaderFuture.get();
+
+		DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		ResourceSpec resourceSpec = ResourceSpec.newBuilder().setCpuCores(2).build();
+
+		final JobVertex firstVertex = new JobVertex("firstVertex");
+		firstVertex.setInvokableClass(NoOpInvokable.class);
+		firstVertex.setResources(resourceSpec, resourceSpec);
+
+		final JobVertex secondVertex = new JobVertex("secondVertex");
+		secondVertex.setInvokableClass(NoOpInvokable.class);
+
+		JobGraph jobGraphWithTwoVertices = new JobGraph(TEST_JOB_ID, "twoVerticesJob", firstVertex, secondVertex);
+		jobGraphWithTwoVertices.setAllowQueuedScheduling(true);
+
+		CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
+
+		try {
+			acknowledgeFuture.get();
+			fail("job submission should have failed");
+		}
+		catch (ExecutionException e) {
+			assertTrue(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent());
+		}
+	}
+
+	/**
 	 * Tests that the dispatcher takes part in the leader election.
 	 */
 	@Test


[flink] 10/10: [FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d56c45a090de689a01fc8ddb884e6baa29568322
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 12 18:28:36 2019 +0200

    [FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile
    
    This change is covered by various existing integration tests that failed prior to this fix.
---
 .../src/main/java/org/apache/flink/table/executor/BatchExecutor.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index 8a90b80..f4bb2ee 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -84,7 +84,9 @@ public class BatchExecutor extends ExecutorBase {
 		// All transformations should set managed memory size.
 		ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0);
 		streamGraph.getStreamNodes().forEach(sn -> {
-			sn.setResources(sn.getMinResources().merge(managedResourceSpec), sn.getPreferredResources().merge(managedResourceSpec));
+			if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
+				sn.setResources(managedResourceSpec, managedResourceSpec);
+			}
 		});
 		streamGraph.setChaining(true);
 		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);


[flink] 02/10: [FLINK-12765][coordinator] Change the default resource spec to UNKNOWN

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6857a4660eb84c07425594d4fb36df57761aa1a
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Wed Jul 10 15:16:16 2019 +0800

    [FLINK-12765][coordinator] Change the default resource spec to UNKNOWN
---
 .../flink/api/common/operators/ResourceSpec.java   | 29 +++++++++++++++++++++-
 .../clusterframework/types/ResourceProfile.java    |  4 +++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 5b1b7b1..4ce5911 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -29,6 +29,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Describe the different resource factors of the operator with UDF.
  *
@@ -52,7 +54,9 @@ public class ResourceSpec implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0, 0);
+	public static final ResourceSpec UNKNOWN = new ResourceSpec();
+
+	public static final ResourceSpec DEFAULT = UNKNOWN;
 
 	public static final String GPU_NAME = "GPU";
 
@@ -95,6 +99,13 @@ public class ResourceSpec implements Serializable {
 			int stateSizeInMB,
 			int managedMemoryInMB,
 			Resource... extendedResources) {
+		checkArgument(cpuCores >= 0, "The cpu cores of the resource spec should not be negative.");
+		checkArgument(heapMemoryInMB >= 0, "The heap memory of the resource spec should not be negative");
+		checkArgument(directMemoryInMB >= 0, "The direct memory of the resource spec should not be negative");
+		checkArgument(nativeMemoryInMB >= 0, "The native memory of the resource spec should not be negative");
+		checkArgument(stateSizeInMB >= 0, "The state size of the resource spec should not be negative");
+		checkArgument(managedMemoryInMB >= 0, "The managed memory of the resource spec should not be negative");
+
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
@@ -109,6 +120,18 @@ public class ResourceSpec implements Serializable {
 	}
 
 	/**
+	 * Creates a new ResourceSpec with all fields unknown.
+	 */
+	private ResourceSpec() {
+		this.cpuCores = -1;
+		this.heapMemoryInMB = -1;
+		this.directMemoryInMB = -1;
+		this.nativeMemoryInMB = -1;
+		this.stateSizeInMB = -1;
+		this.managedMemoryInMB = -1;
+	}
+
+	/**
 	 * Used by system internally to merge the other resources of chained operators
 	 * when generating the job graph or merge the resource consumed by state backend.
 	 *
@@ -116,6 +139,10 @@ public class ResourceSpec implements Serializable {
 	 * @return The new resource with merged values.
 	 */
 	public ResourceSpec merge(ResourceSpec other) {
+		if (this == UNKNOWN || other == UNKNOWN) {
+			return UNKNOWN;
+		}
+
 		ResourceSpec target = new ResourceSpec(
 				Math.max(this.cpuCores, other.cpuCores),
 				this.heapMemoryInMB + other.heapMemoryInMB,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 0a02cd0..840825a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -445,6 +445,10 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
+		if (resourceSpec == ResourceSpec.UNKNOWN) {
+			return UNKNOWN;
+		}
+
 		Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
 
 		return new ResourceProfile(


[flink] 03/10: [hotfix] [tests] Checkstyle fixes and minor code cleanup in ResourceProfileTest and ResourceSpecTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5072dfd17aa8340c66a29e5b8f402d80cd6bb76
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Jul 11 16:15:51 2019 +0200

    [hotfix] [tests] Checkstyle fixes and minor code cleanup in ResourceProfileTest and ResourceSpecTest
---
 .../api/common/operators/ResourceSpecTest.java     | 17 +++++------
 .../types/ResourceProfileTest.java                 | 33 ++++++++++++----------
 2 files changed, 27 insertions(+), 23 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
old mode 100644
new mode 100755
index 5f1e7d1..c6aa04b
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common.operators;
 
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -80,8 +81,8 @@ public class ResourceSpecTest extends TestLogger {
 	public void testEquals() throws Exception {
 		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
 		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
-		assertTrue(rs1.equals(rs2));
-		assertTrue(rs2.equals(rs1));
+		assertEquals(rs1, rs2);
+		assertEquals(rs2, rs1);
 
 		ResourceSpec rs3 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
@@ -93,14 +94,14 @@ public class ResourceSpecTest extends TestLogger {
 				setHeapMemoryInMB(100).
 				setGPUResource(1).
 				build();
-		assertFalse(rs3.equals(rs4));
+		assertNotEquals(rs3, rs4);
 
 		ResourceSpec rs5 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
 				setGPUResource(2.2).
 				build();
-		assertTrue(rs3.equals(rs5));
+		assertEquals(rs3, rs5);
 	}
 
 	@Test
@@ -119,7 +120,7 @@ public class ResourceSpecTest extends TestLogger {
 				setHeapMemoryInMB(100).
 				setGPUResource(1).
 				build();
-		assertFalse(rs3.hashCode() == rs4.hashCode());
+		assertNotEquals(rs3.hashCode(), rs4.hashCode());
 
 		ResourceSpec rs5 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
@@ -152,8 +153,8 @@ public class ResourceSpecTest extends TestLogger {
 				setHeapMemoryInMB(100).
 				setGPUResource(1.1).
 				build();
-		byte[] buffer = InstantiationUtil.serializeObject(rs1);
-		ResourceSpec rs2 = InstantiationUtil.deserializeObject(buffer, ClassLoader.getSystemClassLoader());
+
+		ResourceSpec rs2 = CommonTestUtils.createCopySerializable(rs1);
 		assertEquals(rs1, rs2);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
old mode 100644
new mode 100755
index f80234b..b608ca1
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -27,9 +27,13 @@ import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link ResourceProfile}.
+ */
 public class ResourceProfileTest {
 
 	@Test
@@ -81,7 +85,7 @@ public class ResourceProfileTest {
 	public void testEquals() throws Exception {
 		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
 		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
-		assertTrue(ResourceProfile.fromResourceSpec(rs1, 0).equals(ResourceProfile.fromResourceSpec(rs2, 0)));
+		assertEquals(ResourceProfile.fromResourceSpec(rs1, 0), ResourceProfile.fromResourceSpec(rs2, 0));
 
 		ResourceSpec rs3 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
@@ -93,14 +97,14 @@ public class ResourceProfileTest {
 				setHeapMemoryInMB(100).
 				setGPUResource(1.1).
 				build();
-		assertFalse(ResourceProfile.fromResourceSpec(rs3, 0).equals(ResourceProfile.fromResourceSpec(rs4, 0)));
+		assertNotEquals(ResourceProfile.fromResourceSpec(rs3, 0), ResourceProfile.fromResourceSpec(rs4, 0));
 
 		ResourceSpec rs5 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
 				setGPUResource(2.2).
 				build();
-		assertTrue(ResourceProfile.fromResourceSpec(rs3, 100).equals(ResourceProfile.fromResourceSpec(rs5, 100)));
+		assertEquals(ResourceProfile.fromResourceSpec(rs3, 100), ResourceProfile.fromResourceSpec(rs5, 100));
 
 		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
 		ResourceProfile rp2 = new ResourceProfile(1.1, 100, 100, 100, 100, 100, Collections.emptyMap());
@@ -111,13 +115,13 @@ public class ResourceProfileTest {
 		ResourceProfile rp7 = new ResourceProfile(1.0, 100, 100, 100, 100, 110, Collections.emptyMap());
 		ResourceProfile rp8 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
 
-		assertFalse(rp1.equals(rp2));
-		assertFalse(rp1.equals(rp3));
-		assertFalse(rp1.equals(rp4));
-		assertFalse(rp1.equals(rp5));
-		assertFalse(rp1.equals(rp6));
-		assertFalse(rp1.equals(rp7));
-		assertTrue(rp1.equals(rp8));
+		assertNotEquals(rp1, rp2);
+		assertNotEquals(rp1, rp3);
+		assertNotEquals(rp1, rp4);
+		assertNotEquals(rp1, rp5);
+		assertNotEquals(rp1, rp6);
+		assertNotEquals(rp1, rp7);
+		assertEquals(rp1, rp8);
 	}
 
 	@Test
@@ -142,7 +146,6 @@ public class ResourceProfileTest {
 		assertEquals(1, ResourceProfile.fromResourceSpec(rs3, 0).compareTo(ResourceProfile.fromResourceSpec(rs4, 0)));
 		assertEquals(-1, ResourceProfile.fromResourceSpec(rs4, 0).compareTo(ResourceProfile.fromResourceSpec(rs3, 0)));
 
-
 		ResourceSpec rs5 = ResourceSpec.newBuilder().
 				setCpuCores(1.0).
 				setHeapMemoryInMB(100).
@@ -194,11 +197,11 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMergeWithOverflow() throws Exception {
-		final double LARGE_DOUBLE = Double.MAX_VALUE - 1.0;
-		final int LARGE_INTEGER = Integer.MAX_VALUE - 100;
+		final double largeDouble = Double.MAX_VALUE - 1.0;
+		final int largeInteger = Integer.MAX_VALUE - 100;
 
 		ResourceProfile rp1 = new ResourceProfile(3.0, 300, 300, 300, 300, 300, Collections.emptyMap());
-		ResourceProfile rp2 = new ResourceProfile(LARGE_DOUBLE, LARGE_INTEGER, LARGE_INTEGER, LARGE_INTEGER, LARGE_INTEGER, LARGE_INTEGER, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(largeDouble, largeInteger, largeInteger, largeInteger, largeInteger, largeInteger, Collections.emptyMap());
 
 		assertEquals(ResourceProfile.ANY, rp2.merge(rp2));
 		assertEquals(ResourceProfile.ANY, rp2.merge(rp1));
@@ -220,7 +223,7 @@ public class ResourceProfileTest {
 		assertEquals(rp5, rp4.subtract(rp1));
 
 		try {
-			ResourceProfile ignored = rp1.subtract(rp2);
+			rp1.subtract(rp2);
 			fail("The subtract should failed due to trying to subtract a larger resource");
 		} catch (IllegalArgumentException ex) {
 			// Ignore ex.


[flink] 08/10: [FLINK-12765][jobmanager] Keep track of the resources used in the task slot hierarchy

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bcec7d711591bdbafe9ec460e68e0f609740b9d4
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Mon Jun 17 08:22:41 2019 +0800

    [FLINK-12765][jobmanager] Keep track of the resources used in the task slot hierarchy
---
 .../clusterframework/types/ResourceProfile.java    |  4 ++
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  |  1 +
 .../jobmaster/slotpool/SlotSharingManager.java     | 49 +++++++++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 72 ++++++++++++++++++++++
 4 files changed, 126 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 7ac1dd4..c373fbd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -51,11 +51,15 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	private static final long serialVersionUID = 1L;
 
+	/** A ResourceProfile that indicates an unknown set of resources. */
 	public static final ResourceProfile UNKNOWN = new ResourceProfile();
 
 	/** ResourceProfile which matches any other ResourceProfile. */
 	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
 
+	/** A ResourceProfile describing zero resources. */
+	public static final ResourceProfile ZERO = new ResourceProfile(0, 0);
+
 	// ------------------------------------------------------------------------
 
 	/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index b8d8540..57c42b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -320,6 +320,7 @@ public class SchedulerImpl implements Scheduler {
 
 		final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
 			slotRequestId,
+			slotProfile.getResourceProfile(),
 			scheduledUnit.getJobVertexId(),
 			multiTaskSlotLocality.getLocality());
 		return leaf.getLogicalSlotFuture();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index f5d0dec..a7afbac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
@@ -293,6 +294,11 @@ public class SlotSharingManager {
 		 * @param cause for the release
 		 */
 		public abstract void release(Throwable cause);
+
+		/**
+		 * Gets the total reserved resources of the slot and its descendants.
+		 */
+		public abstract ResourceProfile getReservedResources();
 	}
 
 	/**
@@ -316,6 +322,9 @@ public class SlotSharingManager {
 		// true if we are currently releasing our children
 		private boolean releasingChildren;
 
+		// the total resources reserved by all the descendants.
+		private ResourceProfile reservedResources;
+
 		private MultiTaskSlot(
 				SlotRequestId slotRequestId,
 				AbstractID groupId,
@@ -355,6 +364,8 @@ public class SlotSharingManager {
 			this.children = new HashMap<>(16);
 			this.releasingChildren = false;
 
+			this.reservedResources = ResourceProfile.ZERO;
+
 			slotContextFuture.whenComplete(
 				(SlotContext ignored, Throwable throwable) -> {
 					if (throwable != null) {
@@ -404,6 +415,7 @@ public class SlotSharingManager {
 		 */
 		SingleTaskSlot allocateSingleTaskSlot(
 				SlotRequestId slotRequestId,
+				ResourceProfile resourceProfile,
 				AbstractID groupId,
 				Locality locality) {
 			Preconditions.checkState(!super.contains(groupId));
@@ -412,6 +424,7 @@ public class SlotSharingManager {
 
 			final SingleTaskSlot leaf = new SingleTaskSlot(
 				slotRequestId,
+				resourceProfile,
 				groupId,
 				this,
 				locality);
@@ -421,6 +434,8 @@ public class SlotSharingManager {
 			// register the newly allocated slot also at the SlotSharingManager
 			allTaskSlots.put(slotRequestId, leaf);
 
+			reserveResource(resourceProfile);
+
 			return leaf;
 		}
 
@@ -490,6 +505,11 @@ public class SlotSharingManager {
 			}
 		}
 
+		@Override
+		public ResourceProfile getReservedResources() {
+			return reservedResources;
+		}
+
 		/**
 		 * Releases the child with the given childGroupId.
 		 *
@@ -501,6 +521,9 @@ public class SlotSharingManager {
 
 				if (child != null) {
 					allTaskSlots.remove(child.getSlotRequestId());
+
+					// Update the resources of this slot and the parents
+					releaseResource(child.getReservedResources());
 				}
 
 				if (children.isEmpty()) {
@@ -509,6 +532,22 @@ public class SlotSharingManager {
 			}
 		}
 
+		private void reserveResource(ResourceProfile resourceProfile) {
+			reservedResources = reservedResources.merge(resourceProfile);
+
+			if (parent != null) {
+				parent.reserveResource(resourceProfile);
+			}
+		}
+
+		private void releaseResource(ResourceProfile resourceProfile) {
+			reservedResources = reservedResources.subtract(resourceProfile);
+
+			if (parent != null) {
+				parent.releaseResource(resourceProfile);
+			}
+		}
+
 		@Override
 		public String toString() {
 			String physicalSlotDescription;
@@ -539,13 +578,18 @@ public class SlotSharingManager {
 		// future containing a LogicalSlot which is completed once the underlying SlotContext future is completed
 		private final CompletableFuture<SingleLogicalSlot> singleLogicalSlotFuture;
 
+		// the resource profile of this slot.
+		private final ResourceProfile resourceProfile;
+
 		private SingleTaskSlot(
 				SlotRequestId slotRequestId,
+				ResourceProfile resourceProfile,
 				AbstractID groupId,
 				MultiTaskSlot parent,
 				Locality locality) {
 			super(slotRequestId, groupId);
 
+			this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
 			this.parent = Preconditions.checkNotNull(parent);
 
 			Preconditions.checkNotNull(locality);
@@ -581,6 +625,11 @@ public class SlotSharingManager {
 		}
 
 		@Override
+		public ResourceProfile getReservedResources() {
+			return resourceProfile;
+		}
+
+		@Override
 		public String toString() {
 			String logicalSlotString = "(pending)";
 			try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 8e8db1a..0742ac2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.api.common.resources.GPUResource;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
@@ -135,6 +136,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		SlotRequestId singleTaskSlotRequestId = new SlotRequestId();
 		SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot(
 			singleTaskSlotRequestId,
+			ResourceProfile.UNKNOWN,
 			singleTaskSlotGroupId,
 			Locality.LOCAL);
 
@@ -180,6 +182,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		SlotRequestId singleTaskSlotRequestId = new SlotRequestId();
 		SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot(
 			singleTaskSlotRequestId,
+			ResourceProfile.UNKNOWN,
 			new AbstractID(),
 			Locality.LOCAL);
 
@@ -237,6 +240,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		SlotSharingManager.SingleTaskSlot singleTaskSlot1 = multiTaskSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			new AbstractID(),
 			Locality.LOCAL);
 
@@ -284,12 +288,14 @@ public class SlotSharingManagerTest extends TestLogger {
 		Locality locality1 = Locality.LOCAL;
 		SlotSharingManager.SingleTaskSlot singleTaskSlot1 = rootSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			new AbstractID(),
 			locality1);
 
 		Locality locality2 = Locality.HOST_LOCAL;
 		SlotSharingManager.SingleTaskSlot singleTaskSlot2 = rootSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			new AbstractID(),
 			locality2);
 
@@ -314,6 +320,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		Locality locality3 = Locality.NON_LOCAL;
 		SlotSharingManager.SingleTaskSlot singleTaskSlot3 = rootSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			new AbstractID(),
 			locality3);
 
@@ -349,6 +356,7 @@ public class SlotSharingManagerTest extends TestLogger {
 
 		SlotSharingManager.SingleTaskSlot singleTaskSlot = rootSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			new AbstractID(),
 			Locality.LOCAL);
 
@@ -435,6 +443,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		// occupy the resolved root slot
 		resolvedMultiTaskSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			groupId,
 			Locality.UNCONSTRAINED);
 
@@ -491,6 +500,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		// occupy the slot
 		resolvedRootSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			groupId,
 			slotInfoAndLocality.getLocality());
 
@@ -525,6 +535,7 @@ public class SlotSharingManagerTest extends TestLogger {
 		// occupy the unresolved slot
 		unresolvedRootSlot.allocateSingleTaskSlot(
 			new SlotRequestId(),
+			ResourceProfile.UNKNOWN,
 			groupId,
 			Locality.UNKNOWN);
 
@@ -533,4 +544,65 @@ public class SlotSharingManagerTest extends TestLogger {
 		// we should no longer have a free unresolved root slot
 		assertNull(unresolvedRootSlot1);
 	}
+
+	@Test
+	public void testResourceCalculationOnSlotAllocatingAndReleasing() {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200, 200, 200, 200, 200, Collections.singletonMap("gpu", new GPUResource(2.0)));
+		ResourceProfile rp3 = new ResourceProfile(3.0, 300, 300, 300, 300, 300, Collections.singletonMap("gpu", new GPUResource(3.0)));
+
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		SlotSharingManager slotSharingManager = new SlotSharingManager(
+				SLOT_SHARING_GROUP_ID,
+				allocatedSlotActions,
+				SLOT_OWNER);
+
+		SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+				new SlotRequestId(),
+				new CompletableFuture<>(),
+				new SlotRequestId());
+
+		// Allocates the left subtree.
+		SlotSharingManager.MultiTaskSlot leftMultiTaskSlot =
+				unresolvedRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
+
+		SlotSharingManager.SingleTaskSlot firstChild = leftMultiTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp1,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+		SlotSharingManager.SingleTaskSlot secondChild = leftMultiTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp2,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		assertEquals(rp1, firstChild.getReservedResources());
+		assertEquals(rp2, secondChild.getReservedResources());
+		assertEquals(rp1.merge(rp2), leftMultiTaskSlot.getReservedResources());
+		assertEquals(rp1.merge(rp2), unresolvedRootSlot.getReservedResources());
+
+		// Allocates the right subtree.
+		SlotSharingManager.SingleTaskSlot thirdChild = unresolvedRootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp3,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+		assertEquals(rp3, thirdChild.getReservedResources());
+		assertEquals(rp1.merge(rp2).merge(rp3), unresolvedRootSlot.getReservedResources());
+
+		// Releases the second child in the left-side tree.
+		secondChild.release(new Throwable("Release for testing"));
+		assertEquals(rp1, leftMultiTaskSlot.getReservedResources());
+		assertEquals(rp1.merge(rp3), unresolvedRootSlot.getReservedResources());
+
+		// Releases the third child in the right-side tree.
+		thirdChild.release(new Throwable("Release for testing"));
+		assertEquals(rp1, unresolvedRootSlot.getReservedResources());
+
+		// Releases the first child in the left-side tree.
+		firstChild.release(new Throwable("Release for testing"));
+		assertEquals(ResourceProfile.ZERO, unresolvedRootSlot.getReservedResources());
+	}
 }


[flink] 05/10: [hotfix][runtime] Preserve singleton property of UNKNOWN ResourceSpec and ResourceProfile

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6ef67783006f57e9378e16559bff0d109cee0a6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Jul 11 17:11:58 2019 +0200

    [hotfix][runtime] Preserve singleton property of UNKNOWN ResourceSpec and ResourceProfile
---
 .../org/apache/flink/api/common/operators/ResourceSpec.java | 13 +++++++++++++
 .../apache/flink/api/common/operators/ResourceSpecTest.java |  8 ++++++++
 .../runtime/clusterframework/types/ResourceProfile.java     | 13 +++++++++++++
 .../runtime/clusterframework/types/ResourceProfileTest.java |  8 ++++++++
 4 files changed, 42 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 7ddd2b3..dc6a611 100755
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -287,6 +287,19 @@ public class ResourceSpec implements Serializable {
 				'}';
 	}
 
+	// ------------------------------------------------------------------------
+	//  serialization
+	// ------------------------------------------------------------------------
+
+	private Object readResolve() {
+		// try to preserve the singleton property for UNKNOWN
+		return this.equals(UNKNOWN) ? UNKNOWN : this;
+	}
+
+	// ------------------------------------------------------------------------
+	//  builder
+	// ------------------------------------------------------------------------
+
 	public static Builder newBuilder() {
 		return new Builder();
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
index 2227d89..b00be49 100755
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -205,4 +206,11 @@ public class ResourceSpecTest extends TestLogger {
 
 		assertEquals(ResourceSpec.UNKNOWN, merged);
 	}
+
+	@Test
+	public void testSingletonPropertyOfUnknown() throws Exception {
+		final ResourceSpec copiedSpec = CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
+
+		assertSame(ResourceSpec.UNKNOWN, copiedSpec);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 3031b63..7ac1dd4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -444,6 +444,19 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			'}';
 	}
 
+	// ------------------------------------------------------------------------
+	//  serialization
+	// ------------------------------------------------------------------------
+
+	private Object readResolve() {
+		// try to preserve the singleton property for UNKNOWN
+		return this.equals(UNKNOWN) ? UNKNOWN : this;
+	}
+
+	// ------------------------------------------------------------------------
+	//  factories
+	// ------------------------------------------------------------------------
+
 	public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
 		if (ResourceSpec.UNKNOWN.equals(resourceSpec)) {
 			return UNKNOWN;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index ffbfb5e..5264ceb 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -257,4 +258,11 @@ public class ResourceProfileTest {
 
 		assertEquals(ResourceProfile.fromResourceSpec(ResourceSpec.UNKNOWN, 0), profile);
 	}
+
+	@Test
+	public void testSingletonPropertyOfUnknown() throws Exception {
+		final ResourceProfile copiedProfile = CommonTestUtils.createCopySerializable(ResourceProfile.UNKNOWN);
+
+		assertSame(ResourceProfile.UNKNOWN, copiedProfile);
+	}
 }


[flink] 01/10: [FLINK-12766][runtime] Introduce merge and subtract calculations and tests for ResourceProfile.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0421594f22d676ff15b6edf66b9861597000ff7d
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sun Jul 7 09:44:08 2019 +0800

    [FLINK-12766][runtime] Introduce merge and subtract calculations and tests for ResourceProfile.
---
 .../flink/api/common/resources/Resource.java       |  35 ++++--
 .../clusterframework/types/ResourceProfile.java    | 121 ++++++++++++++++++++-
 .../types/ResourceProfileTest.java                 |  83 ++++++++++++++
 3 files changed, 231 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
index 59448a8..cc1bcd2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
@@ -61,24 +61,45 @@ public abstract class Resource implements Serializable {
 	}
 
 	public Resource merge(Resource other) {
-		Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource resourceAggregateType");
-		Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
-		Preconditions.checkArgument(this.resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType");
+		Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
+		Preconditions.checkArgument(name.equals(other.name), "Merge with different resource name");
+		Preconditions.checkArgument(resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType");
 
 		final double aggregatedValue;
 		switch (resourceAggregateType) {
 			case AGGREGATE_TYPE_MAX :
-				aggregatedValue = Math.max(other.value, this.value);
+				aggregatedValue = Math.max(value, other.value);
 				break;
 
 			case AGGREGATE_TYPE_SUM:
 			default:
-				aggregatedValue = this.value + other.value;
+				aggregatedValue = value + other.value;
 		}
 
 		return create(aggregatedValue, resourceAggregateType);
 	}
 
+	public Resource subtract(Resource other) {
+		Preconditions.checkArgument(getClass() == other.getClass(), "Minus with different resource type");
+		Preconditions.checkArgument(name.equals(other.name), "Minus with different resource name");
+		Preconditions.checkArgument(resourceAggregateType == other.resourceAggregateType, "Minus with different aggregate resourceAggregateType");
+		Preconditions.checkArgument(value >= other.value, "Try to subtract a larger resource from this one.");
+
+		final double subtractedValue;
+		switch (resourceAggregateType) {
+			case AGGREGATE_TYPE_MAX :
+				// TODO: For max, should check if the latest max item is removed and change accordingly.
+				subtractedValue = value;
+				break;
+
+			case AGGREGATE_TYPE_SUM:
+			default:
+				subtractedValue = value - other.value;
+		}
+
+		return create(subtractedValue, resourceAggregateType);
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -105,11 +126,11 @@ public abstract class Resource implements Serializable {
 	}
 
 	public ResourceAggregateType getResourceAggregateType() {
-		return this.resourceAggregateType;
+		return resourceAggregateType;
 	}
 
 	public double getValue() {
-		return this.value;
+		return value;
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 0d9eca3..0a02cd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.types;
 
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.resources.Resource;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
@@ -30,6 +31,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Describe the immutable resource profile of the slot, either when requiring or offering it. The profile can be
  * checked whether it can match another profile's requirement, and furthermore we may calculate a matching
@@ -48,7 +51,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	private static final long serialVersionUID = 1L;
 
-	public static final ResourceProfile UNKNOWN = new ResourceProfile(-1.0, -1);
+	public static final ResourceProfile UNKNOWN = new ResourceProfile();
 
 	/** ResourceProfile which matches any other ResourceProfile. */
 	public static final ResourceProfile ANY = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
@@ -97,6 +100,12 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			int networkMemoryInMB,
 			int managedMemoryInMB,
 			Map<String, Resource> extendedResources) {
+		Preconditions.checkArgument(cpuCores >= 0);
+		Preconditions.checkArgument(heapMemoryInMB >= 0);
+		Preconditions.checkArgument(directMemoryInMB >= 0);
+		Preconditions.checkArgument(nativeMemoryInMB >= 0);
+		Preconditions.checkArgument(networkMemoryInMB >= 0);
+		Preconditions.checkArgument(managedMemoryInMB >= 0);
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
@@ -119,6 +128,18 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
+	 * Creates a special ResourceProfile with negative values, indicating resources are unspecified.
+	 */
+	private ResourceProfile() {
+		this.cpuCores = -1.0;
+		this.heapMemoryInMB = -1;
+		this.directMemoryInMB = -1;
+		this.nativeMemoryInMB = -1;
+		this.networkMemoryInMB = -1;
+		this.managedMemoryInMB = -1;
+	}
+
+	/**
 	 * Creates a copy of the given ResourceProfile.
 	 *
 	 * @param other The ResourceProfile to copy.
@@ -309,6 +330,104 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		return false;
 	}
 
+	/**
+	 * Calculates the sum of two resource profiles.
+	 *
+	 * @param other The other resource profile to add.
+	 * @return The merged resource profile.
+	 */
+	@Nonnull
+	public ResourceProfile merge(@Nonnull ResourceProfile other) {
+		if (equals(ANY) || other.equals(ANY)) {
+			return ANY;
+		}
+
+		if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) {
+			return UNKNOWN;
+		}
+
+		Map<String, Resource> resultExtendedResource = new HashMap<>(extendedResources);
+
+		other.extendedResources.forEach((String name, Resource resource) -> {
+			resultExtendedResource.compute(name, (ignored, oldResource) ->
+				oldResource == null ? resource : oldResource.merge(resource));
+		});
+
+		return new ResourceProfile(
+			addNonNegativeDoublesConsideringOverflow(cpuCores, other.cpuCores),
+			addNonNegativeIntegersConsideringOverflow(heapMemoryInMB, other.heapMemoryInMB),
+			addNonNegativeIntegersConsideringOverflow(directMemoryInMB, other.directMemoryInMB),
+			addNonNegativeIntegersConsideringOverflow(nativeMemoryInMB, other.nativeMemoryInMB),
+			addNonNegativeIntegersConsideringOverflow(networkMemoryInMB, other.networkMemoryInMB),
+			addNonNegativeIntegersConsideringOverflow(managedMemoryInMB, other.managedMemoryInMB),
+			resultExtendedResource);
+	}
+
+	/**
+	 * Subtracts another piece of resource profile from this one.
+	 *
+	 * @param other The other resource profile to subtract.
+	 * @return The subtracted resource profile.
+	 */
+	public ResourceProfile subtract(ResourceProfile other) {
+		if (equals(ANY) || other.equals(ANY)) {
+			return ANY;
+		}
+
+		if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) {
+			return UNKNOWN;
+		}
+
+		checkArgument(isMatching(other), "Try to subtract an unmatched resource profile from this one.");
+
+		Map<String, Resource> resultExtendedResource = new HashMap<>(extendedResources);
+
+		other.extendedResources.forEach((String name, Resource resource) -> {
+			resultExtendedResource.compute(name, (ignored, oldResource) -> {
+				Resource resultResource = oldResource.subtract(resource);
+				return resultResource.getValue() == 0 ? null : resultResource;
+			});
+		});
+
+		return new ResourceProfile(
+			subtractDoublesConsideringInf(cpuCores, other.cpuCores),
+			subtractIntegersConsideringInf(heapMemoryInMB, other.heapMemoryInMB),
+			subtractIntegersConsideringInf(directMemoryInMB, other.directMemoryInMB),
+			subtractIntegersConsideringInf(nativeMemoryInMB, other.nativeMemoryInMB),
+			subtractIntegersConsideringInf(networkMemoryInMB, other.networkMemoryInMB),
+			subtractIntegersConsideringInf(managedMemoryInMB, other.managedMemoryInMB),
+			resultExtendedResource
+		);
+	}
+
+	private double addNonNegativeDoublesConsideringOverflow(double first, double second) {
+		double result = first + second;
+
+		if (result == Double.POSITIVE_INFINITY) {
+			return Double.MAX_VALUE;
+		}
+
+		return result;
+	}
+
+	private int addNonNegativeIntegersConsideringOverflow(int first, int second) {
+		int result = first + second;
+
+		if (result < 0) {
+			return Integer.MAX_VALUE;
+		}
+
+		return result;
+	}
+
+	private double subtractDoublesConsideringInf(double first, double second) {
+		return first == Double.MAX_VALUE ? Double.MAX_VALUE : first - second;
+	}
+
+	private int subtractIntegersConsideringInf(int first, int second) {
+		return first == Integer.MAX_VALUE ? Integer.MAX_VALUE : first - second;
+	}
+
 	@Override
 	public String toString() {
 		final StringBuilder resources = new StringBuilder(extendedResources.size() * 10);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index a6c5f58..f80234b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.clusterframework.types;
 
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.resources.GPUResource;
+
 import org.junit.Test;
 
 import java.util.Collections;
@@ -26,6 +28,7 @@ import java.util.Collections;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ResourceProfileTest {
 
@@ -162,4 +165,84 @@ public class ResourceProfileTest {
 		assertEquals(100, rp.getOperatorsMemoryInMB());
 		assertEquals(1.6, rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001);
 	}
+
+	@Test
+	public void testMerge() throws Exception {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200, 200, 200, 200, 200,
+				Collections.singletonMap("gpu", new GPUResource(2.0)));
+
+		ResourceProfile rp1MergeRp1 = new ResourceProfile(2.0, 200, 200, 200, 200, 200,
+				Collections.emptyMap());
+		ResourceProfile rp1MergeRp2 = new ResourceProfile(3.0, 300, 300, 300, 300, 300,
+				Collections.singletonMap("gpu", new GPUResource(2.0)));
+		ResourceProfile rp2MergeRp2 = new ResourceProfile(4.0, 400, 400, 400, 400, 400,
+				Collections.singletonMap("gpu", new GPUResource(4.0)));
+
+		assertEquals(rp1MergeRp1, rp1.merge(rp1));
+		assertEquals(rp1MergeRp2, rp1.merge(rp2));
+		assertEquals(rp1MergeRp2, rp2.merge(rp1));
+		assertEquals(rp2MergeRp2, rp2.merge(rp2));
+
+		assertEquals(ResourceProfile.UNKNOWN, rp1.merge(ResourceProfile.UNKNOWN));
+		assertEquals(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN.merge(rp1));
+		assertEquals(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN.merge(ResourceProfile.UNKNOWN));
+		assertEquals(ResourceProfile.ANY, rp1.merge(ResourceProfile.ANY));
+		assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.merge(rp1));
+		assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.merge(ResourceProfile.ANY));
+	}
+
+	@Test
+	public void testMergeWithOverflow() throws Exception {
+		final double LARGE_DOUBLE = Double.MAX_VALUE - 1.0;
+		final int LARGE_INTEGER = Integer.MAX_VALUE - 100;
+
+		ResourceProfile rp1 = new ResourceProfile(3.0, 300, 300, 300, 300, 300, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(LARGE_DOUBLE, LARGE_INTEGER, LARGE_INTEGER, LARGE_INTEGER, LARGE_INTEGER, LARGE_INTEGER, Collections.emptyMap());
+
+		assertEquals(ResourceProfile.ANY, rp2.merge(rp2));
+		assertEquals(ResourceProfile.ANY, rp2.merge(rp1));
+		assertEquals(ResourceProfile.ANY, rp1.merge(rp2));
+	}
+
+	@Test
+	public void testSubtract() throws Exception {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 100, 100, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200, 200, 200, 200, 200, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(3.0, 300, 300, 300, 300, 300, Collections.emptyMap());
+
+		assertEquals(rp1, rp3.subtract(rp2));
+		assertEquals(rp1, rp2.subtract(rp1));
+
+		ResourceProfile rp4 = new ResourceProfile(Double.MAX_VALUE, 100, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+		ResourceProfile rp5 = new ResourceProfile(Double.MAX_VALUE, 0, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Collections.emptyMap());
+
+		assertEquals(rp5, rp4.subtract(rp1));
+
+		try {
+			ResourceProfile ignored = rp1.subtract(rp2);
+			fail("The subtract should failed due to trying to subtract a larger resource");
+		} catch (IllegalArgumentException ex) {
+			// Ignore ex.
+		}
+
+		assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.subtract(rp3));
+		assertEquals(ResourceProfile.ANY, ResourceProfile.ANY.subtract(ResourceProfile.ANY));
+		assertEquals(ResourceProfile.ANY, rp3.subtract(ResourceProfile.ANY));
+
+		assertEquals(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN.subtract(rp3));
+		assertEquals(ResourceProfile.UNKNOWN, rp3.subtract(ResourceProfile.UNKNOWN));
+		assertEquals(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN.subtract(ResourceProfile.UNKNOWN));
+	}
+
+	@Test
+	public void testSubtractWithInfValues() {
+		// Does not equals to ANY since it has extended resources.
+		ResourceProfile rp1 = new ResourceProfile(Double.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE,
+				Integer.MAX_VALUE, Collections.singletonMap("gpu", new GPUResource(4.0)));
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200, 200, 200, 200, 200,
+				Collections.emptyMap());
+
+		assertEquals(rp1, rp1.subtract(rp2));
+	}
 }


[flink] 04/10: [FLINK-12766][runtime] Fix bug in merging and converting UNKNOWN ResourceSpecs.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2947015e664ddff083e407c539cde0b7f78a83de
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Jul 11 16:54:07 2019 +0200

    [FLINK-12766][runtime] Fix bug in merging and converting UNKNOWN ResourceSpecs.
---
 .../flink/api/common/operators/ResourceSpec.java   |  2 +-
 .../api/common/operators/ResourceSpecTest.java     | 48 ++++++++++++++++++++++
 .../clusterframework/types/ResourceProfile.java    |  2 +-
 .../types/ResourceProfileTest.java                 |  9 ++++
 4 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
old mode 100644
new mode 100755
index 4ce5911..7ddd2b3
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -139,7 +139,7 @@ public class ResourceSpec implements Serializable {
 	 * @return The new resource with merged values.
 	 */
 	public ResourceSpec merge(ResourceSpec other) {
-		if (this == UNKNOWN || other == UNKNOWN) {
+		if (this.equals(UNKNOWN) || other.equals(UNKNOWN)) {
 			return UNKNOWN;
 		}
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
index c6aa04b..2227d89 100755
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
@@ -157,4 +157,52 @@ public class ResourceSpecTest extends TestLogger {
 		ResourceSpec rs2 = CommonTestUtils.createCopySerializable(rs1);
 		assertEquals(rs1, rs2);
 	}
+
+	@Test
+	public void testMergeThisUnknown() throws Exception {
+		final ResourceSpec spec1 = ResourceSpec.UNKNOWN;
+		final ResourceSpec spec2 = ResourceSpec.newBuilder()
+				.setCpuCores(1.0)
+				.setHeapMemoryInMB(100)
+				.setGPUResource(1.1)
+				.build();
+
+		final ResourceSpec merged = spec1.merge(spec2);
+
+		assertEquals(ResourceSpec.UNKNOWN, merged);
+	}
+
+	@Test
+	public void testMergeOtherUnknown() throws Exception {
+		final ResourceSpec spec1 = ResourceSpec.newBuilder()
+			.setCpuCores(1.0)
+			.setHeapMemoryInMB(100)
+			.setGPUResource(1.1)
+			.build();
+		final ResourceSpec spec2 = ResourceSpec.UNKNOWN;
+
+		final ResourceSpec merged = spec1.merge(spec2);
+
+		assertEquals(ResourceSpec.UNKNOWN, merged);
+	}
+
+	@Test
+	public void testMergeBothUnknown() throws Exception {
+		final ResourceSpec spec1 = ResourceSpec.UNKNOWN;
+		final ResourceSpec spec2 = ResourceSpec.UNKNOWN;
+
+		final ResourceSpec merged = spec1.merge(spec2);
+
+		assertEquals(ResourceSpec.UNKNOWN, merged);
+	}
+
+	@Test
+	public void testMergeWithSerializationCopy() throws Exception {
+		final ResourceSpec spec1 = CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
+		final ResourceSpec spec2 = CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
+
+		final ResourceSpec merged = spec1.merge(spec2);
+
+		assertEquals(ResourceSpec.UNKNOWN, merged);
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
old mode 100644
new mode 100755
index 840825a..3031b63
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -445,7 +445,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
-		if (resourceSpec == ResourceSpec.UNKNOWN) {
+		if (ResourceSpec.UNKNOWN.equals(resourceSpec)) {
 			return UNKNOWN;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index b608ca1..ffbfb5e 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.types;
 
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.resources.GPUResource;
+import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
 
@@ -248,4 +249,12 @@ public class ResourceProfileTest {
 
 		assertEquals(rp1, rp1.subtract(rp2));
 	}
+
+	@Test
+	public void testFromSpecWithSerializationCopy() throws Exception {
+		final ResourceSpec copiedSpec = CommonTestUtils.createCopySerializable(ResourceSpec.UNKNOWN);
+		final ResourceProfile profile = ResourceProfile.fromResourceSpec(copiedSpec, 0);
+
+		assertEquals(ResourceProfile.fromResourceSpec(ResourceSpec.UNKNOWN, 0), profile);
+	}
 }


[flink] 09/10: [FLINK-12765][jobmanager] Let some slot reqests fail if the sharing slot is oversubscribed

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a587576ef513f1df5a3c19b0e01a9eb43ec6f80e
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Sat Jun 22 00:13:56 2019 +0800

    [FLINK-12765][jobmanager] Let some slot reqests fail if the sharing slot is oversubscribed
---
 .../runtime/jobmaster/slotpool/SchedulerImpl.java  |  53 ++++-
 .../SharedSlotOversubscribedException.java         |  40 ++++
 .../jobmaster/slotpool/SlotSharingManager.java     |  89 ++++++++-
 .../jobmaster/slotpool/SlotPoolCoLocationTest.java | 162 +++++++++++++++
 .../slotpool/SlotPoolSlotSharingTest.java          |  92 +++++++++
 .../jobmaster/slotpool/SlotSharingManagerTest.java | 219 +++++++++++++++++++++
 6 files changed, 640 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
index 57c42b8..ef87dd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -145,24 +146,53 @@ public class SchedulerImpl implements Scheduler {
 		componentMainThreadExecutor.assertRunningInMainThread();
 
 		final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();
+		internalAllocateSlot(
+				allocationResultFuture,
+				slotRequestId,
+				scheduledUnit,
+				slotProfile,
+				allowQueuedScheduling,
+				allocationTimeout);
+		return allocationResultFuture;
+	}
 
+	private void internalAllocateSlot(
+			CompletableFuture<LogicalSlot> allocationResultFuture,
+			SlotRequestId slotRequestId,
+			ScheduledUnit scheduledUnit,
+			SlotProfile slotProfile,
+			boolean allowQueuedScheduling,
+			Time allocationTimeout) {
 		CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
 			allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) :
 			allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
 
 		allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
 			if (failure != null) {
-				cancelSlotRequest(
-					slotRequestId,
-					scheduledUnit.getSlotSharingGroupId(),
-					failure);
-				allocationResultFuture.completeExceptionally(failure);
+				Optional<SharedSlotOversubscribedException> sharedSlotOverAllocatedException =
+						ExceptionUtils.findThrowable(failure, SharedSlotOversubscribedException.class);
+				if (sharedSlotOverAllocatedException.isPresent() &&
+						sharedSlotOverAllocatedException.get().canRetry()) {
+
+					// Retry the allocation
+					internalAllocateSlot(
+							allocationResultFuture,
+							slotRequestId,
+							scheduledUnit,
+							slotProfile,
+							allowQueuedScheduling,
+							allocationTimeout);
+				} else {
+					cancelSlotRequest(
+							slotRequestId,
+							scheduledUnit.getSlotSharingGroupId(),
+							failure);
+					allocationResultFuture.completeExceptionally(failure);
+				}
 			} else {
 				allocationResultFuture.complete(slot);
 			}
 		});
-
-		return allocationResultFuture;
 	}
 
 	@Override
@@ -354,7 +384,14 @@ public class SchedulerImpl implements Scheduler {
 
 			if (taskSlot != null) {
 				Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
-				return SlotSharingManager.MultiTaskSlotLocality.of(((SlotSharingManager.MultiTaskSlot) taskSlot), Locality.LOCAL);
+
+				SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
+
+				if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getResourceProfile())) {
+					return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
+				}
+
+				throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
 			} else {
 				// the slot may have been cancelled in the mean time
 				coLocationConstraint.setSlotRequestId(null);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java
new file mode 100644
index 0000000..8a61612
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SharedSlotOversubscribedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+/**
+ * If a shared slot is over-allocated before it has been resolved,
+ * some requests will be rejected with this exception to ensure the
+ * total resource requested do not exceed the total resources. The
+ * released requests can be retried if couldRetry is marked.
+ */
+class SharedSlotOversubscribedException extends Exception {
+
+	/** Whether the requester can retry the request. */
+	private final boolean canRetry;
+
+	SharedSlotOversubscribedException(String message, boolean canRetry) {
+		super(message);
+		this.canRetry = canRetry;
+	}
+
+	boolean canRetry() {
+		return canRetry;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index a7afbac..2c81a91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -41,6 +41,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.AbstractCollection;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -48,6 +49,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -356,9 +358,9 @@ public class SlotSharingManager {
 				CompletableFuture<? extends SlotContext> slotContextFuture,
 				@Nullable SlotRequestId allocatedSlotRequestId) {
 			super(slotRequestId, groupId);
+			Preconditions.checkNotNull(slotContextFuture);
 
 			this.parent = parent;
-			this.slotContextFuture = Preconditions.checkNotNull(slotContextFuture);
 			this.allocatedSlotRequestId = allocatedSlotRequestId;
 
 			this.children = new HashMap<>(16);
@@ -366,12 +368,19 @@ public class SlotSharingManager {
 
 			this.reservedResources = ResourceProfile.ZERO;
 
-			slotContextFuture.whenComplete(
-				(SlotContext ignored, Throwable throwable) -> {
-					if (throwable != null) {
-						release(throwable);
-					}
-				});
+			this.slotContextFuture = slotContextFuture.handle((SlotContext slotContext, Throwable throwable) -> {
+				if (throwable != null) {
+					// If the underlying resource request failed, we currently fail all the requests
+					release(throwable);
+					throw new CompletionException(throwable);
+				}
+
+				if (parent == null) {
+					checkOversubscriptionAndReleaseChildren(slotContext);
+				}
+
+				return slotContext;
+			});
 		}
 
 		CompletableFuture<? extends SlotContext> getSlotContextFuture() {
@@ -511,6 +520,30 @@ public class SlotSharingManager {
 		}
 
 		/**
+		 * Checks if the task slot may have enough resource to fulfill the specific
+		 * request. If the underlying slot is not allocated, the check is skipped.
+		 *
+		 * @param resourceProfile The specific request to check.
+		 * @return Whether the slot is possible to fulfill the request in the future.
+		 */
+		boolean mayHaveEnoughResourcesToFulfill(ResourceProfile resourceProfile) {
+			if (!slotContextFuture.isDone()) {
+				return true;
+			}
+
+			MultiTaskSlot root = this;
+
+			while (root.parent != null) {
+				root = root.parent;
+			}
+
+			SlotContext slotContext = root.getSlotContextFuture().join();
+
+			return slotContext.getResourceProfile().isMatching(
+					resourceProfile.merge(root.getReservedResources()));
+		}
+
+		/**
 		 * Releases the child with the given childGroupId.
 		 *
 		 * @param childGroupId identifying the child to release
@@ -548,6 +581,48 @@ public class SlotSharingManager {
 			}
 		}
 
+		private void checkOversubscriptionAndReleaseChildren(SlotContext slotContext) {
+			final ResourceProfile slotResources = slotContext.getResourceProfile();
+			final ArrayList<TaskSlot> childrenToEvict = new ArrayList<>();
+			ResourceProfile requiredResources = ResourceProfile.ZERO;
+
+			for (TaskSlot slot : children.values()) {
+				final ResourceProfile resourcesWithChild = requiredResources.merge(slot.getReservedResources());
+
+				if (slotResources.isMatching(resourcesWithChild)) {
+					requiredResources = resourcesWithChild;
+				} else {
+					childrenToEvict.add(slot);
+				}
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Not all requests are fulfilled due to over-allocated, number of requests is {}, " +
+						"number of evicted requests is {}, underlying allocated is {}, fulfilled is {}, " +
+						"evicted requests is {},",
+					children.size(),
+					childrenToEvict.size(),
+					slotContext.getResourceProfile(),
+					requiredResources,
+					childrenToEvict);
+			}
+
+			if (childrenToEvict.size() == children.size()) {
+				// Since RM always return a slot whose resource is larger than the requested one,
+				// The current situation only happens when we request to RM using the resource
+				// profile of a task who is belonging to a CoLocationGroup. Similar to dealing
+				// with the failure of the underlying request, currently we fail all the requests
+				// directly.
+				release(new SharedSlotOversubscribedException(
+					"The allocated slot does not have enough resource for any task.", false));
+			} else {
+				for (TaskSlot taskSlot : childrenToEvict) {
+					taskSlot.release(new SharedSlotOversubscribedException(
+						"The allocated slot does not have enough resource for all the tasks.", true));
+				}
+			}
+		}
+
 		@Override
 		public String toString() {
 			String physicalSlotDescription;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
index 44c9977..52cc3af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java
@@ -41,6 +41,7 @@ import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -51,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test cases for {@link CoLocationConstraint} with the {@link SlotPoolImpl}.
@@ -158,4 +160,164 @@ public class SlotPoolCoLocationTest extends TestLogger {
 		assertEquals(logicalSlot21.getAllocationId(), logicalSlot22.getAllocationId());
 		assertNotEquals(logicalSlot11.getAllocationId(), logicalSlot21.getAllocationId());
 	}
+
+	@Test
+	public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionException, InterruptedException {
+		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+		final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
+
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPool slotPoolGateway = slotPoolResource.getSlotPool();
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+		CoLocationGroup group = new CoLocationGroup();
+		CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+		JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp1),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp2),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp3),
+				TestingUtils.infiniteTime());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		Collection<SlotOffer> slotOfferFuture1 = slotPoolGateway.offerSlots(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				Collections.singletonList(new SlotOffer(
+						allocationId1,
+						0,
+						allocatedSlotRp)));
+
+		assertFalse(slotOfferFuture1.isEmpty());
+
+		for (CompletableFuture<LogicalSlot> logicalSlotFuture : Arrays.asList(logicalSlotFuture1, logicalSlotFuture2, logicalSlotFuture3)) {
+			assertTrue(logicalSlotFuture.isDone() && logicalSlotFuture.isCompletedExceptionally());
+			logicalSlotFuture.whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+				assertTrue(throwable instanceof SharedSlotOversubscribedException);
+				assertTrue(((SharedSlotOversubscribedException) throwable).canRetry());
+			});
+		}
+	}
+
+	@Test
+	public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionException, InterruptedException {
+		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+		final ResourceProfile allocatedSlotRp = new ResourceProfile(3.0, 300);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
+
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPool slotPoolGateway = slotPoolResource.getSlotPool();
+		slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+		CoLocationGroup group = new CoLocationGroup();
+		CoLocationConstraint coLocationConstraint1 = group.getLocationConstraint(0);
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+
+		JobVertexID jobVertexId1 = new JobVertexID();
+		JobVertexID jobVertexId2 = new JobVertexID();
+		JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp1),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp2),
+				TestingUtils.infiniteTime());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		Collection<SlotOffer> slotOfferFuture1 = slotPoolGateway.offerSlots(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				Collections.singletonList(new SlotOffer(
+						allocationId1,
+						0,
+						allocatedSlotRp)));
+
+		assertFalse(slotOfferFuture1.isEmpty());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						coLocationConstraint1),
+				true,
+				SlotProfile.noLocality(rp3),
+				TestingUtils.infiniteTime());
+
+		LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+		LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+		assertEquals(allocationId1, logicalSlot1.getAllocationId());
+		assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+		assertTrue(logicalSlotFuture3.isDone() && logicalSlotFuture3.isCompletedExceptionally());
+		logicalSlotFuture3.whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+			assertTrue(throwable instanceof SharedSlotOversubscribedException);
+			assertTrue(((SharedSlotOversubscribedException) throwable).canRetry());
+		});
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
index f1447d0..f2b8fa7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java
@@ -331,4 +331,96 @@ public class SlotPoolSlotSharingTest extends TestLogger {
 		assertEquals(allocationId2, logicalSlot3.getAllocationId());
 	}
 
+	@Test
+	public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, ExecutionException {
+		final ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		final ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		final ResourceProfile rp3 = new ResourceProfile(5.0, 500);
+
+		final ResourceProfile firstAllocatedSlotRp = new ResourceProfile(3.0, 300);
+		final ResourceProfile secondAllocatedSlotRp = new ResourceProfile(5.0, 500);
+
+		final BlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(2);
+		final TestingResourceManagerGateway testingResourceManagerGateway = slotPoolResource.getTestingResourceManagerGateway();
+		testingResourceManagerGateway.setRequestSlotConsumer(
+				(SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId()));
+
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+
+		final SlotPoolImpl slotPool = slotPoolResource.getSlotPool();
+		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
+
+		final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
+		final JobVertexID jobVertexId1 = new JobVertexID();
+		final JobVertexID jobVertexId2 = new JobVertexID();
+		final JobVertexID jobVertexId3 = new JobVertexID();
+
+		final SlotProvider slotProvider = slotPoolResource.getSlotProvider();
+		CompletableFuture<LogicalSlot> logicalSlotFuture1 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId1,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(rp1),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture2 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId2,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(rp2),
+				TestingUtils.infiniteTime());
+
+		CompletableFuture<LogicalSlot> logicalSlotFuture3 = slotProvider.allocateSlot(
+				new ScheduledUnit(
+						jobVertexId3,
+						slotSharingGroupId,
+						null),
+				true,
+				SlotProfile.noLocality(rp3),
+				TestingUtils.infiniteTime());
+
+		assertFalse(logicalSlotFuture1.isDone());
+		assertFalse(logicalSlotFuture2.isDone());
+		assertFalse(logicalSlotFuture3.isDone());
+
+		final AllocationID allocationId1 = allocationIds.take();
+
+		// This should fulfill the first two requests.
+		boolean offerFuture = slotPool.offerSlot(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				new SlotOffer(
+						allocationId1,
+						0,
+						firstAllocatedSlotRp));
+
+		assertTrue(offerFuture);
+
+		LogicalSlot logicalSlot1 = logicalSlotFuture1.get();
+		LogicalSlot logicalSlot2 = logicalSlotFuture2.get();
+
+		assertEquals(allocationId1, logicalSlot1.getAllocationId());
+		assertEquals(allocationId1, logicalSlot2.getAllocationId());
+
+		// The third request will retry.
+		assertFalse(logicalSlotFuture3.isDone());
+		final AllocationID allocationId2 = allocationIds.take();
+
+		offerFuture = slotPool.offerSlot(
+				taskManagerLocation,
+				new SimpleAckingTaskManagerGateway(),
+				new SlotOffer(
+						allocationId2,
+						1,
+						secondAllocatedSlotRp));
+
+		assertTrue(offerFuture);
+
+		LogicalSlot logicalSlot3 = logicalSlotFuture3.get();
+		assertEquals(allocationId2, logicalSlot3.getAllocationId());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index 0742ac2..9bc1d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.resources.GPUResource;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -27,11 +28,13 @@ import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -39,18 +42,28 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
+import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test cases for the {@link SlotSharingManager}.
@@ -605,4 +618,210 @@ public class SlotSharingManagerTest extends TestLogger {
 		firstChild.release(new Throwable("Release for testing"));
 		assertEquals(ResourceProfile.ZERO, unresolvedRootSlot.getReservedResources());
 	}
+
+	@Test
+	public void testHashEnoughResourceOfMultiTaskSlot() {
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100);
+		ResourceProfile rp2 = new ResourceProfile(2.0, 200);
+		ResourceProfile allocatedSlotRp = new ResourceProfile(2.0, 200);
+
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		SlotSharingManager slotSharingManager = new SlotSharingManager(
+				SLOT_SHARING_GROUP_ID,
+				allocatedSlotActions,
+				SLOT_OWNER);
+
+		CompletableFuture<SlotContext> slotContextFuture = new CompletableFuture<>();
+
+		SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+				new SlotRequestId(),
+				slotContextFuture,
+				new SlotRequestId());
+
+		SlotSharingManager.MultiTaskSlot multiTaskSlot =
+				unresolvedRootSlot.allocateMultiTaskSlot(new SlotRequestId(), new SlotSharingGroupId());
+
+		SlotSharingManager.SingleTaskSlot firstChild = multiTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				rp1,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp1), is(true));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp2), is(true));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN), is(true));
+
+		slotContextFuture.complete(new AllocatedSlot(
+				new AllocationID(),
+				new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46),
+				0,
+				allocatedSlotRp,
+				mock(TaskManagerGateway.class)));
+
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp1), is(true));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(rp2), is(false));
+		assertThat(multiTaskSlot.mayHaveEnoughResourcesToFulfill(ResourceProfile.UNKNOWN), is(true));
+	}
+
+	@Test
+	public void testSlotAllocatedWithEnoughResource() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(16.0, 1600));
+
+		// With enough resources, all the requests should be fulfilled.
+		for (SlotSharingManager.SingleTaskSlot singleTaskSlot : context.singleTaskSlotsInOrder) {
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+		}
+
+		// The multi-task slot for coLocation should be kept.
+		assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), notNullValue());
+	}
+
+	@Test
+	public void testSlotOverAllocatedAndSingleSlotReleased() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(7.0, 700));
+
+		// The two coLocated requests and the third request is successful.
+		for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+			SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+			if (i != 3) {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+			} else {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+				singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+					assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+					assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(true));
+				});
+			}
+		}
+
+		// The multi-task slot for coLocation should be kept.
+		assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), notNullValue());
+	}
+
+	@Test
+	public void testSlotOverAllocatedAndMultiTaskSlotReleased() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(3.0, 300));
+
+		// Only the third request is fulfilled.
+		for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+			SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+			if (i == 2) {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(false));
+			} else {
+				assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+				singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+					assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+					assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(true));
+				});
+			}
+		}
+
+		// The multi-task slot for coLocation should not be kept.
+		assertThat(context.slotSharingManager.getTaskSlot(context.coLocationTaskSlot.getSlotRequestId()), nullValue());
+	}
+
+	@Test
+	public void testSlotOverAllocatedAndAllTaskSlotReleased() {
+		SlotSharingResourceTestContext context = createResourceTestContext(new ResourceProfile(2.0, 200));
+
+		// Only the third request is fulfilled.
+		for (int i = 0; i < context.singleTaskSlotsInOrder.size(); ++i) {
+			SlotSharingManager.SingleTaskSlot singleTaskSlot = context.singleTaskSlotsInOrder.get(i);
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isDone(), is(true));
+
+			assertThat(singleTaskSlot.getLogicalSlotFuture().isCompletedExceptionally(), is(true));
+			singleTaskSlot.getLogicalSlotFuture().whenComplete((LogicalSlot ignored, Throwable throwable) -> {
+				assertThat(throwable instanceof SharedSlotOversubscribedException, is(true));
+
+				// Since no request is fulfilled, these requests will be failed and should not retry.
+				assertThat(((SharedSlotOversubscribedException) throwable).canRetry(), is(false));
+			});
+		}
+
+		// All the task slots should be removed.
+		assertThat(context.slotSharingManager.isEmpty(), is(true));
+	}
+
+	private SlotSharingResourceTestContext createResourceTestContext(ResourceProfile allocatedResourceProfile) {
+		ResourceProfile coLocationTaskRp = new ResourceProfile(2.0, 200);
+		ResourceProfile thirdChildRp = new ResourceProfile(3.0, 300);
+		ResourceProfile forthChildRp = new ResourceProfile(9.0, 900);
+
+		final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions();
+
+		SlotSharingManager slotSharingManager = new SlotSharingManager(
+				SLOT_SHARING_GROUP_ID,
+				allocatedSlotActions,
+				SLOT_OWNER);
+
+		CompletableFuture<SlotContext> slotContextFuture = new CompletableFuture<>();
+
+		SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.createRootSlot(
+				new SlotRequestId(),
+				slotContextFuture,
+				new SlotRequestId());
+
+		SlotSharingManager.MultiTaskSlot coLocationTaskSlot = unresolvedRootSlot.allocateMultiTaskSlot(
+				new SlotRequestId(), new SlotSharingGroupId());
+
+		SlotSharingManager.SingleTaskSlot firstCoLocatedChild = coLocationTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				coLocationTaskRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+		SlotSharingManager.SingleTaskSlot secondCoLocatedChild = coLocationTaskSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				coLocationTaskRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		SlotSharingManager.SingleTaskSlot thirdChild = unresolvedRootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				thirdChildRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		SlotSharingManager.SingleTaskSlot forthChild = unresolvedRootSlot.allocateSingleTaskSlot(
+				new SlotRequestId(),
+				forthChildRp,
+				new SlotSharingGroupId(),
+				Locality.LOCAL);
+
+		slotContextFuture.complete(new AllocatedSlot(
+				new AllocationID(),
+				new TaskManagerLocation(new ResourceID("tm-X"), InetAddress.getLoopbackAddress(), 46),
+				0,
+				allocatedResourceProfile,
+				mock(TaskManagerGateway.class)));
+
+		return new SlotSharingResourceTestContext(
+				slotSharingManager,
+				coLocationTaskSlot,
+				Arrays.asList(firstCoLocatedChild, secondCoLocatedChild, thirdChild, forthChild));
+	}
+
+	/**
+	 * An utility class maintains the testing sharing slot hierarchy.
+	 */
+	private class SlotSharingResourceTestContext {
+		final SlotSharingManager slotSharingManager;
+		final SlotSharingManager.MultiTaskSlot coLocationTaskSlot;
+		final List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder;
+
+		SlotSharingResourceTestContext(
+				@Nonnull SlotSharingManager slotSharingManager,
+				@Nonnull SlotSharingManager.MultiTaskSlot coLocationTaskSlot,
+				@Nonnull List<SlotSharingManager.SingleTaskSlot> singleTaskSlotsInOrder) {
+
+			this.slotSharingManager = slotSharingManager;
+			this.coLocationTaskSlot = coLocationTaskSlot;
+			this.singleTaskSlotsInOrder = singleTaskSlotsInOrder;
+		}
+	}
 }