You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/15 07:55:12 UTC

[1/2] flink git commit: [FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager

Repository: flink
Updated Branches:
  refs/heads/master fba72d073 -> 76e3156d0


[FLINK-7928] [runtime] extend the resources in ResourceProfile for precisely calculating the resource of task manager

Summary:
ResourceProfile denotes the resource requirements of a task. It should contains:
1. The resource for the operators: the resources in ResourceSpec (please refer to jira-7878)
2. The resource for the task to communicate with its upstreams.
3. The resource for the task to communicate with its downstreams.
Now the ResourceProfile only contains the first part. Adding the last two parts.

Test Plan: UnitTests

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D330364

This closes #4991.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5643d156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5643d156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5643d156

Branch: refs/heads/master
Commit: 5643d156cea72314c2240119b30aa32a65a0aeb7
Parents: fba72d0
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Oct 26 17:38:04 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 15:51:18 2017 +0100

----------------------------------------------------------------------
 .../api/common/operators/ResourceSpec.java      |  22 ++-
 .../clusterframework/types/ResourceProfile.java | 146 +++++++++++++++----
 .../types/ResourceProfileTest.java              | 100 ++++++++++++-
 .../flink/yarn/YarnResourceManagerTest.java     |   3 +-
 4 files changed, 234 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 4554b54..e82c738 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -54,7 +54,7 @@ public class ResourceSpec implements Serializable {
 
 	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);
 
-	private static final String GPU_NAME = "GPU";
+	public static final String GPU_NAME = "GPU";
 
 	/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
 	private final double cpuCores;
@@ -146,7 +146,7 @@ public class ResourceSpec implements Serializable {
 	public double getGPUResource() {
 		Resource gpuResource = extendedResources.get(GPU_NAME);
 		if (gpuResource != null) {
-			return gpuResource.value;
+			return gpuResource.getValue();
 		}
 		return 0.0;
 	}
@@ -249,8 +249,8 @@ public class ResourceSpec implements Serializable {
 	 */
 	public static class Builder {
 
-		public double cpuCores;
-		public int heapMemoryInMB;
+		private double cpuCores;
+		private int heapMemoryInMB;
 		private int directMemoryInMB;
 		private int nativeMemoryInMB;
 		private int stateSizeInMB;
@@ -300,7 +300,7 @@ public class ResourceSpec implements Serializable {
 	/**
 	 * Base class for additional resources one can specify.
 	 */
-	protected abstract static class Resource implements Serializable {
+	public abstract static class Resource implements Serializable {
 
 		private static final long serialVersionUID = 1L;
 
@@ -372,6 +372,18 @@ public class ResourceSpec implements Serializable {
 			return result;
 		}
 
+		public String getName() {
+			return this.name;
+		}
+
+		public ResourceAggregateType getAggregateType() {
+			return this.type;
+		}
+
+		public double getValue() {
+			return this.value;
+		}
+
 		/**
 		 * Create a resource of the same resource type.
 		 *

http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index faa93e5..fc0bf15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,19 +18,30 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.operators.ResourceSpec;
+
 import javax.annotation.Nonnull;
+
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * Describe the resource profile of the slot, either when requiring or offering it. The profile can be
+ * Describe the immutable resource profile of the slot, either when requiring or offering it. The profile can be
  * checked whether it can match another profile's requirement, and furthermore we may calculate a matching
  * score to decide which profile we should choose when we have lots of candidate slots.
+ * It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster.
  * 
  * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
  * <ol>
  *     <li>Memory Size</li>
  *     <li>CPU cores</li>
+ *     <li>Extended resources</li>
  * </ol>
+ * The extended resources are compared ordered by the resource names.
  */
 public class ResourceProfile implements Serializable, Comparable<ResourceProfile> {
 
@@ -52,6 +63,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	/** How many native memory in mb are needed */
 	private final int nativeMemoryInMB;
 
+	/** A extensible field for user specified resources from {@link ResourceSpec}. */
+	private final Map<String, ResourceSpec.Resource> extendedResources = new HashMap<>(1);
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -61,16 +75,21 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 * @param directMemoryInMB The size of the direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
+	 * @param extendedResources The extendiable resources such as GPU and FPGA
 	 */
 	public ResourceProfile(
 			double cpuCores,
 			int heapMemoryInMB,
 			int directMemoryInMB,
-			int nativeMemoryInMB) {
+			int nativeMemoryInMB,
+			Map<String, ResourceSpec.Resource> extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
+		if (extendedResources != null) {
+			this.extendedResources.putAll(extendedResources);
+		}
 	}
 
 	/**
@@ -80,10 +99,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 */
 	public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-		this.cpuCores = cpuCores;
-		this.heapMemoryInMB = heapMemoryInMB;
-		this.directMemoryInMB = 0;
-		this.nativeMemoryInMB = 0;
+		this(cpuCores, heapMemoryInMB, 0, 0, Collections.EMPTY_MAP);
 	}
 
 	/**
@@ -92,16 +108,14 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param other The ResourceProfile to copy. 
 	 */
 	public ResourceProfile(ResourceProfile other) {
-		this.cpuCores = other.cpuCores;
-		this.heapMemoryInMB = other.heapMemoryInMB;
-		this.directMemoryInMB = other.directMemoryInMB;
-		this.nativeMemoryInMB = other.nativeMemoryInMB;
+		this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
 	}
 
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Get the cpu cores needed
+	 * Get the cpu cores needed.
+	 *
 	 * @return The cpu cores, 1.0 means a full cpu thread
 	 */
 	public double getCpuCores() {
@@ -109,15 +123,17 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
-	 * Get the heap memory needed in MB
+	 * Get the heap memory needed in MB.
+	 *
 	 * @return The heap memory in MB
 	 */
-	public long getHeapMemoryInMB() {
+	public int getHeapMemoryInMB() {
 		return heapMemoryInMB;
 	}
 
 	/**
-	 * Get the direct memory needed in MB
+	 * Get the direct memory needed in MB.
+	 *
 	 * @return The direct memory in MB
 	 */
 	public int getDirectMemoryInMB() {
@@ -125,7 +141,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
-	 * Get the native memory needed in MB
+	 * Get the native memory needed in MB.
+	 *
 	 * @return The native memory in MB
 	 */
 	public int getNativeMemoryInMB() {
@@ -133,7 +150,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
-	 * Get the total memory needed in MB
+	 * Get the total memory needed in MB.
+	 *
 	 * @return The total memory in MB
 	 */
 	public int getMemoryInMB() {
@@ -141,23 +159,76 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	}
 
 	/**
-	 * Check whether required resource profile can be matched
+	 * Get the memory the operators needed in MB.
+	 *
+	 * @return The operator memory in MB
+	 */
+	public int getOperatorsMemoryInMB() {
+		return heapMemoryInMB + directMemoryInMB + nativeMemoryInMB;
+	}
+
+	/**
+	 * Get the extended resources.
+	 *
+	 * @return The extended resources
+	 */
+	public Map<String, ResourceSpec.Resource> getExtendedResources() {
+		return Collections.unmodifiableMap(extendedResources);
+	}
+
+	/**
+	 * Check whether required resource profile can be matched.
 	 *
 	 * @param required the required resource profile
 	 * @return true if the requirement is matched, otherwise false
 	 */
 	public boolean isMatching(ResourceProfile required) {
-		return cpuCores >= required.getCpuCores() &&
+		if (cpuCores >= required.getCpuCores() &&
 				heapMemoryInMB >= required.getHeapMemoryInMB() &&
 				directMemoryInMB >= required.getDirectMemoryInMB() &&
-				nativeMemoryInMB >= required.getNativeMemoryInMB();
+				nativeMemoryInMB >= required.getNativeMemoryInMB()) {
+			for (Map.Entry<String, ResourceSpec.Resource> resource : required.extendedResources.entrySet()) {
+				if (!extendedResources.containsKey(resource.getKey()) ||
+						!extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) ||
+						extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) {
+					return false;
+				}
+			}
+			return true;
+		}
+		return false;
 	}
 
 	@Override
 	public int compareTo(@Nonnull ResourceProfile other) {
-		int cmp1 = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB());
-		int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
-		return (cmp1 != 0) ? cmp1 : cmp2;
+		int cmp = Integer.compare(this.getMemoryInMB(), other.getMemoryInMB());
+		if (cmp == 0) {
+			cmp = Double.compare(this.cpuCores, other.cpuCores);
+		}
+		if (cmp == 0) {
+			Iterator<Map.Entry<String, ResourceSpec.Resource>> thisIterator = extendedResources.entrySet().iterator();
+			Iterator<Map.Entry<String, ResourceSpec.Resource>> otherIterator = other.extendedResources.entrySet().iterator();
+			while (thisIterator.hasNext() && otherIterator.hasNext()) {
+				Map.Entry<String, ResourceSpec.Resource> thisResource = thisIterator.next();
+				Map.Entry<String, ResourceSpec.Resource> otherResource = otherIterator.next();
+				if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) {
+					return cmp;
+				}
+				if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) {
+					return 1;
+				}
+				if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) {
+					return cmp;
+				}
+			}
+			if (thisIterator.hasNext()) {
+				return 1;
+			}
+			if (otherIterator.hasNext()) {
+				return -1;
+			}
+		}
+		return cmp;
 	}
 
 	// ------------------------------------------------------------------------
@@ -169,6 +240,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 		result = 31 * result + heapMemoryInMB;
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
+		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
 
@@ -181,20 +253,40 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			ResourceProfile that = (ResourceProfile) obj;
 			return this.cpuCores == that.cpuCores &&
 					this.heapMemoryInMB == that.heapMemoryInMB &&
-					this.directMemoryInMB == that.directMemoryInMB;
-		}
-		else {
-			return false;
+					this.directMemoryInMB == that.directMemoryInMB &&
+					Objects.equals(extendedResources, that.extendedResources);
 		}
+		return false;
 	}
 
 	@Override
 	public String toString() {
+		String resourceStr = "";
+		for (Map.Entry<String, ResourceSpec.Resource> resource : extendedResources.entrySet()) {
+			resourceStr += ", " + resource.getKey() + "=" + resource.getValue();
+		}
 		return "ResourceProfile{" +
 			"cpuCores=" + cpuCores +
 			", heapMemoryInMB=" + heapMemoryInMB +
 			", directMemoryInMB=" + directMemoryInMB +
-			", nativeMemoryInMB=" + nativeMemoryInMB +
+			", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr +
 			'}';
 	}
+
+	public static ResourceProfile fromResourceSpec(
+			ResourceSpec resourceSpec) {
+		Map<String, ResourceSpec.Resource> extendResource = new HashMap<>(1);
+		double gpu = resourceSpec.getGPUResource();
+		if (gpu > 0) {
+			extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu));
+		}
+		ResourceProfile resourceProfile = new ResourceProfile(
+				resourceSpec.getCpuCores(),
+				resourceSpec.getHeapMemory(),
+				resourceSpec.getDirectMemory(),
+				resourceSpec.getNativeMemory(),
+				extendResource
+		);
+		return resourceProfile;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index aacdcfa..25cb5fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.junit.Test;
 
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -27,10 +31,10 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMatchRequirement() throws Exception {
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100);
-		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200);
-		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100);
-		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200);
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, Collections.EMPTY_MAP);
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, Collections.EMPTY_MAP);
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, Collections.EMPTY_MAP);
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, Collections.EMPTY_MAP);
 
 		assertFalse(rp1.isMatching(rp2));
 		assertTrue(rp2.isMatching(rp1));
@@ -45,10 +49,98 @@ public class ResourceProfileTest {
 		assertTrue(rp4.isMatching(rp2));
 		assertTrue(rp4.isMatching(rp3));
 		assertTrue(rp4.isMatching(rp4));
+
+		ResourceSpec rs1 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(2.2).
+				build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(1.1).
+				build();
+
+
+		assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1)));
+		assertTrue(ResourceProfile.fromResourceSpec(rs1).isMatching(ResourceProfile.fromResourceSpec(rs2)));
+		assertFalse(ResourceProfile.fromResourceSpec(rs2).isMatching(ResourceProfile.fromResourceSpec(rs1)));
 	}
 
 	@Test
 	public void testUnknownMatchesUnknown() {
 		assertTrue(ResourceProfile.UNKNOWN.isMatching(ResourceProfile.UNKNOWN));
 	}
+
+	@Test
+	public void testEquals() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		assertTrue(ResourceProfile.fromResourceSpec(rs1).equals(ResourceProfile.fromResourceSpec(rs2)));
+
+		ResourceSpec rs3 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(2.2).
+				build();
+		ResourceSpec rs4 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(1.1).
+				build();
+		assertFalse(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs4)));
+
+		ResourceSpec rs5 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(2.2).
+				build();
+		assertTrue(ResourceProfile.fromResourceSpec(rs3).equals(ResourceProfile.fromResourceSpec(rs5)));
+	}
+
+	@Test
+	public void testCompareTo() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		assertEquals(0, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs2)));
+
+		ResourceSpec rs3 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(2.2).
+				build();
+		assertEquals(-1, ResourceProfile.fromResourceSpec(rs1).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+		assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs1)));
+
+		ResourceSpec rs4 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(1.1).
+				build();
+		assertEquals(1, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs4)));
+		assertEquals(-1, ResourceProfile.fromResourceSpec(rs4).compareTo(ResourceProfile.fromResourceSpec(rs3)));
+
+
+		ResourceSpec rs5 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(2.2).
+				build();
+		assertEquals(0, ResourceProfile.fromResourceSpec(rs3).compareTo(ResourceProfile.fromResourceSpec(rs5)));
+	}
+
+	@Test
+	public void testGet() throws Exception {
+		ResourceSpec rs = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(1.6).
+				build();
+		ResourceProfile rp = ResourceProfile.fromResourceSpec(rs);
+
+		assertEquals(1.0, rp.getCpuCores(), 0.000001);
+		assertEquals(100, rp.getMemoryInMB());
+		assertEquals(100, rp.getOperatorsMemoryInMB());
+		assertEquals(1.6, rp.getExtendedResources().get(ResourceSpec.GPU_NAME).getValue(), 0.000001);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5643d156/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 252b3a8..cd08fb9 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -346,7 +347,7 @@ public class YarnResourceManagerTest extends TestLogger {
 			final SlotReport slotReport = new SlotReport(
 				new SlotStatus(
 					new SlotID(taskManagerResourceId, 1),
-					new ResourceProfile(10, 1, 1, 1)));
+					new ResourceProfile(10, 1, 1, 1, Collections.emptyMap())));
 
 			CompletableFuture<Integer> numberRegisteredSlotsFuture = rmGateway
 				.registerTaskExecutor(


[2/2] flink git commit: [FLINK-7928] Move Resource out of ResourceSpec

Posted by tr...@apache.org.
[FLINK-7928] Move Resource out of ResourceSpec


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76e3156d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76e3156d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76e3156d

Branch: refs/heads/master
Commit: 76e3156d014e12094bbb55ecbd024b5edbf4b4cf
Parents: 5643d15
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 14 16:25:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 16:25:43 2017 +0100

----------------------------------------------------------------------
 .../api/common/operators/ResourceSpec.java      | 141 ++-----------------
 .../flink/api/common/resources/GPUResource.java |  44 ++++++
 .../flink/api/common/resources/Resource.java    | 123 ++++++++++++++++
 .../clusterframework/types/ResourceProfile.java |  59 ++++----
 .../org/apache/flink/api/scala/DataSet.scala    |   2 +-
 5 files changed, 209 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index e82c738..7bc2948 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -19,7 +19,8 @@
 package org.apache.flink.api.common.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.api.common.resources.GPUResource;
+import org.apache.flink.api.common.resources.Resource;
 
 import javax.annotation.Nonnull;
 
@@ -28,8 +29,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Describe the different resource factors of the operator with UDF.
  *
@@ -97,7 +96,7 @@ public class ResourceSpec implements Serializable {
 		this.stateSizeInMB = stateSizeInMB;
 		for (Resource resource : extendedResources) {
 			if (resource != null) {
-				this.extendedResources.put(resource.name, resource);
+				this.extendedResources.put(resource.getName(), resource);
 			}
 		}
 	}
@@ -118,7 +117,7 @@ public class ResourceSpec implements Serializable {
 				this.stateSizeInMB + other.stateSizeInMB);
 		target.extendedResources.putAll(extendedResources);
 		for (Resource resource : other.extendedResources.values()) {
-			target.extendedResources.merge(resource.name, resource, (v1, v2) -> v1.merge(v2));
+			target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
 		}
 		return target;
 	}
@@ -148,9 +147,14 @@ public class ResourceSpec implements Serializable {
 		if (gpuResource != null) {
 			return gpuResource.getValue();
 		}
+
 		return 0.0;
 	}
 
+	public Map<String, Resource> getExtendedResources() {
+		return extendedResources;
+	}
+
 	/**
 	 * Check whether all the field values are valid.
 	 *
@@ -160,7 +164,7 @@ public class ResourceSpec implements Serializable {
 		if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
 				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
 			for (Resource resource : extendedResources.values()) {
-				if (resource.value < 0) {
+				if (resource.getValue() < 0) {
 					return false;
 				}
 			}
@@ -185,9 +189,9 @@ public class ResourceSpec implements Serializable {
 		int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
 		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
 			for (Resource resource : extendedResources.values()) {
-				if (!other.extendedResources.containsKey(resource.name) ||
-						!other.extendedResources.get(resource.name).type.equals(resource.type) ||
-						other.extendedResources.get(resource.name).value < resource.value) {
+				if (!other.extendedResources.containsKey(resource.getName()) ||
+					other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
+						other.extendedResources.get(resource.getName()).getValue() < resource.getValue()) {
 					return false;
 				}
 			}
@@ -229,7 +233,7 @@ public class ResourceSpec implements Serializable {
 	public String toString() {
 		String extend = "";
 		for (Resource resource : extendedResources.values()) {
-			extend += ", " + resource.name + "=" + resource.value;
+			extend += ", " + resource.getName() + "=" + resource.getValue();
 		}
 		return "ResourceSpec{" +
 				"cpuCores=" + cpuCores +
@@ -297,121 +301,4 @@ public class ResourceSpec implements Serializable {
 		}
 	}
 
-	/**
-	 * Base class for additional resources one can specify.
-	 */
-	public abstract static class Resource implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Enum defining how resources are aggregated.
-		 */
-		public enum ResourceAggregateType {
-			/**
-			 * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
-			 */
-			AGGREGATE_TYPE_SUM,
-
-			/**
-			 * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
-			 */
-			AGGREGATE_TYPE_MAX
-		}
-
-		private final String name;
-
-		private final double value;
-
-		private final ResourceAggregateType type;
-
-		public Resource(String name, double value, ResourceAggregateType type) {
-			this.name = checkNotNull(name);
-			this.value = value;
-			this.type = checkNotNull(type);
-		}
-
-		Resource merge(Resource other) {
-			Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
-			Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
-			Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
-
-			Double value = null;
-			switch (type) {
-				case AGGREGATE_TYPE_MAX :
-					value = Math.max(other.value, this.value);
-					break;
-
-				case AGGREGATE_TYPE_SUM:
-				default:
-					value = this.value + other.value;
-			}
-
-			Resource resource = create(value, type);
-			return resource;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			} else if (o != null && getClass() == o.getClass()) {
-				Resource other = (Resource) o;
-
-				return name.equals(other.name) && type.equals(other.type) && value == other.value;
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			int result = name.hashCode();
-			result = 31 * result + type.ordinal();
-			result = 31 * result + (int) value;
-			return result;
-		}
-
-		public String getName() {
-			return this.name;
-		}
-
-		public ResourceAggregateType getAggregateType() {
-			return this.type;
-		}
-
-		public double getValue() {
-			return this.value;
-		}
-
-		/**
-		 * Create a resource of the same resource type.
-		 *
-		 * @param value The value of the resource
-		 * @param type The aggregate type of the resource
-		 * @return A new instance of the sub resource
-		 */
-		protected abstract Resource create(double value, ResourceAggregateType type);
-	}
-
-	/**
-	 * The GPU resource.
-	 */
-	public static class GPUResource extends Resource {
-
-		private static final long serialVersionUID = -2276080061777135142L;
-
-		public GPUResource(double value) {
-			this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
-		}
-
-		public GPUResource(double value, ResourceAggregateType type) {
-			super(GPU_NAME, value, type);
-		}
-
-		@Override
-		public Resource create(double value, ResourceAggregateType type) {
-			return new GPUResource(value, type);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
new file mode 100644
index 0000000..1e7f1ef
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.resources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ResourceSpec;
+
+/**
+ * The GPU resource.
+ */
+@Internal
+public class GPUResource extends Resource {
+
+	private static final long serialVersionUID = -2276080061777135142L;
+
+	public GPUResource(double value) {
+		this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+	}
+
+	private GPUResource(double value, ResourceAggregateType type) {
+		super(ResourceSpec.GPU_NAME, value, type);
+	}
+
+	@Override
+	public Resource create(double value, ResourceAggregateType type) {
+		return new GPUResource(value, type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
new file mode 100644
index 0000000..59448a8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.resources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for resources one can specify.
+ */
+@Internal
+public abstract class Resource implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Enum defining how resources are aggregated.
+	 */
+	public enum ResourceAggregateType {
+		/**
+		 * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
+		 */
+		AGGREGATE_TYPE_SUM,
+
+		/**
+		 * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
+		 */
+		AGGREGATE_TYPE_MAX
+	}
+
+	private final String name;
+
+	private final double value;
+
+	private final ResourceAggregateType resourceAggregateType;
+
+	protected Resource(String name, double value, ResourceAggregateType type) {
+		this.name = checkNotNull(name);
+		this.value = value;
+		this.resourceAggregateType = checkNotNull(type);
+	}
+
+	public Resource merge(Resource other) {
+		Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource resourceAggregateType");
+		Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
+		Preconditions.checkArgument(this.resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType");
+
+		final double aggregatedValue;
+		switch (resourceAggregateType) {
+			case AGGREGATE_TYPE_MAX :
+				aggregatedValue = Math.max(other.value, this.value);
+				break;
+
+			case AGGREGATE_TYPE_SUM:
+			default:
+				aggregatedValue = this.value + other.value;
+		}
+
+		return create(aggregatedValue, resourceAggregateType);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o != null && getClass() == o.getClass()) {
+			Resource other = (Resource) o;
+
+			return name.equals(other.name) && resourceAggregateType == other.resourceAggregateType && value == other.value;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		int result = name.hashCode();
+		result = 31 * result + resourceAggregateType.ordinal();
+		result = 31 * result + (int) value;
+		return result;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public ResourceAggregateType getResourceAggregateType() {
+		return this.resourceAggregateType;
+	}
+
+	public double getValue() {
+		return this.value;
+	}
+
+	/**
+	 * Create a resource of the same resource resourceAggregateType.
+	 *
+	 * @param value The value of the resource
+	 * @param type The aggregate resourceAggregateType of the resource
+	 * @return A new instance of the sub resource
+	 */
+	protected abstract Resource create(double value, ResourceAggregateType type);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index fc0bf15..3dec3f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.resources.Resource;
 import org.apache.flink.api.common.operators.ResourceSpec;
 
 import javax.annotation.Nonnull;
@@ -34,7 +35,7 @@ import java.util.Objects;
  * checked whether it can match another profile's requirement, and furthermore we may calculate a matching
  * score to decide which profile we should choose when we have lots of candidate slots.
  * It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster.
- * 
+ *
  * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
  * <ol>
  *     <li>Memory Size</li>
@@ -51,20 +52,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	// ------------------------------------------------------------------------
 
-	/** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
+	/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
 	private final double cpuCores;
 
-	/** How many heap memory in mb are needed */
+	/** How many heap memory in mb are needed. */
 	private final int heapMemoryInMB;
 
-	/** How many direct memory in mb are needed */
+	/** How many direct memory in mb are needed. */
 	private final int directMemoryInMB;
 
-	/** How many native memory in mb are needed */
+	/** How many native memory in mb are needed. */
 	private final int nativeMemoryInMB;
 
 	/** A extensible field for user specified resources from {@link ResourceSpec}. */
-	private final Map<String, ResourceSpec.Resource> extendedResources = new HashMap<>(1);
+	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
 	// ------------------------------------------------------------------------
 
@@ -82,7 +83,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			int heapMemoryInMB,
 			int directMemoryInMB,
 			int nativeMemoryInMB,
-			Map<String, ResourceSpec.Resource> extendedResources) {
+			Map<String, Resource> extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
@@ -104,8 +105,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	/**
 	 * Creates a copy of the given ResourceProfile.
-	 * 
-	 * @param other The ResourceProfile to copy. 
+	 *
+	 * @param other The ResourceProfile to copy.
 	 */
 	public ResourceProfile(ResourceProfile other) {
 		this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
@@ -172,7 +173,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 *
 	 * @return The extended resources
 	 */
-	public Map<String, ResourceSpec.Resource> getExtendedResources() {
+	public Map<String, Resource> getExtendedResources() {
 		return Collections.unmodifiableMap(extendedResources);
 	}
 
@@ -187,9 +188,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				heapMemoryInMB >= required.getHeapMemoryInMB() &&
 				directMemoryInMB >= required.getDirectMemoryInMB() &&
 				nativeMemoryInMB >= required.getNativeMemoryInMB()) {
-			for (Map.Entry<String, ResourceSpec.Resource> resource : required.extendedResources.entrySet()) {
+			for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
 				if (!extendedResources.containsKey(resource.getKey()) ||
-						!extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) ||
+						!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
 						extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) {
 					return false;
 				}
@@ -206,15 +207,15 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			cmp = Double.compare(this.cpuCores, other.cpuCores);
 		}
 		if (cmp == 0) {
-			Iterator<Map.Entry<String, ResourceSpec.Resource>> thisIterator = extendedResources.entrySet().iterator();
-			Iterator<Map.Entry<String, ResourceSpec.Resource>> otherIterator = other.extendedResources.entrySet().iterator();
+			Iterator<Map.Entry<String, Resource>> thisIterator = extendedResources.entrySet().iterator();
+			Iterator<Map.Entry<String, Resource>> otherIterator = other.extendedResources.entrySet().iterator();
 			while (thisIterator.hasNext() && otherIterator.hasNext()) {
-				Map.Entry<String, ResourceSpec.Resource> thisResource = thisIterator.next();
-				Map.Entry<String, ResourceSpec.Resource> otherResource = otherIterator.next();
+				Map.Entry<String, Resource> thisResource = thisIterator.next();
+				Map.Entry<String, Resource> otherResource = otherIterator.next();
 				if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) {
 					return cmp;
 				}
-				if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) {
+				if (!otherResource.getValue().getResourceAggregateType().equals(thisResource.getValue().getResourceAggregateType())) {
 					return 1;
 				}
 				if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) {
@@ -261,32 +262,26 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	@Override
 	public String toString() {
-		String resourceStr = "";
-		for (Map.Entry<String, ResourceSpec.Resource> resource : extendedResources.entrySet()) {
-			resourceStr += ", " + resource.getKey() + "=" + resource.getValue();
+		final StringBuilder resources = new StringBuilder(extendedResources.size() * 10);
+		for (Map.Entry<String, Resource> resource : extendedResources.entrySet()) {
+			resources.append(", ").append(resource.getKey()).append('=').append(resource.getValue());
 		}
 		return "ResourceProfile{" +
 			"cpuCores=" + cpuCores +
 			", heapMemoryInMB=" + heapMemoryInMB +
 			", directMemoryInMB=" + directMemoryInMB +
-			", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr +
+			", nativeMemoryInMB=" + nativeMemoryInMB + resources +
 			'}';
 	}
 
-	public static ResourceProfile fromResourceSpec(
-			ResourceSpec resourceSpec) {
-		Map<String, ResourceSpec.Resource> extendResource = new HashMap<>(1);
-		double gpu = resourceSpec.getGPUResource();
-		if (gpu > 0) {
-			extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu));
-		}
-		ResourceProfile resourceProfile = new ResourceProfile(
+	static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) {
+		Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
+
+		return new ResourceProfile(
 				resourceSpec.getCpuCores(),
 				resourceSpec.getHeapMemory(),
 				resourceSpec.getDirectMemory(),
 				resourceSpec.getNativeMemory(),
-				extendResource
-		);
-		return resourceProfile;
+				copiedExtendedResources);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index bfe7567..fd36568 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order}
+import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod