You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/15 07:55:13 UTC

[2/2] flink git commit: [FLINK-7928] Move Resource out of ResourceSpec

[FLINK-7928] Move Resource out of ResourceSpec


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76e3156d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76e3156d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76e3156d

Branch: refs/heads/master
Commit: 76e3156d014e12094bbb55ecbd024b5edbf4b4cf
Parents: 5643d15
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 14 16:25:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 16:25:43 2017 +0100

----------------------------------------------------------------------
 .../api/common/operators/ResourceSpec.java      | 141 ++-----------------
 .../flink/api/common/resources/GPUResource.java |  44 ++++++
 .../flink/api/common/resources/Resource.java    | 123 ++++++++++++++++
 .../clusterframework/types/ResourceProfile.java |  59 ++++----
 .../org/apache/flink/api/scala/DataSet.scala    |   2 +-
 5 files changed, 209 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index e82c738..7bc2948 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -19,7 +19,8 @@
 package org.apache.flink.api.common.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.api.common.resources.GPUResource;
+import org.apache.flink.api.common.resources.Resource;
 
 import javax.annotation.Nonnull;
 
@@ -28,8 +29,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Describe the different resource factors of the operator with UDF.
  *
@@ -97,7 +96,7 @@ public class ResourceSpec implements Serializable {
 		this.stateSizeInMB = stateSizeInMB;
 		for (Resource resource : extendedResources) {
 			if (resource != null) {
-				this.extendedResources.put(resource.name, resource);
+				this.extendedResources.put(resource.getName(), resource);
 			}
 		}
 	}
@@ -118,7 +117,7 @@ public class ResourceSpec implements Serializable {
 				this.stateSizeInMB + other.stateSizeInMB);
 		target.extendedResources.putAll(extendedResources);
 		for (Resource resource : other.extendedResources.values()) {
-			target.extendedResources.merge(resource.name, resource, (v1, v2) -> v1.merge(v2));
+			target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
 		}
 		return target;
 	}
@@ -148,9 +147,14 @@ public class ResourceSpec implements Serializable {
 		if (gpuResource != null) {
 			return gpuResource.getValue();
 		}
+
 		return 0.0;
 	}
 
+	public Map<String, Resource> getExtendedResources() {
+		return extendedResources;
+	}
+
 	/**
 	 * Check whether all the field values are valid.
 	 *
@@ -160,7 +164,7 @@ public class ResourceSpec implements Serializable {
 		if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
 				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
 			for (Resource resource : extendedResources.values()) {
-				if (resource.value < 0) {
+				if (resource.getValue() < 0) {
 					return false;
 				}
 			}
@@ -185,9 +189,9 @@ public class ResourceSpec implements Serializable {
 		int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
 		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
 			for (Resource resource : extendedResources.values()) {
-				if (!other.extendedResources.containsKey(resource.name) ||
-						!other.extendedResources.get(resource.name).type.equals(resource.type) ||
-						other.extendedResources.get(resource.name).value < resource.value) {
+				if (!other.extendedResources.containsKey(resource.getName()) ||
+					other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
+						other.extendedResources.get(resource.getName()).getValue() < resource.getValue()) {
 					return false;
 				}
 			}
@@ -229,7 +233,7 @@ public class ResourceSpec implements Serializable {
 	public String toString() {
 		String extend = "";
 		for (Resource resource : extendedResources.values()) {
-			extend += ", " + resource.name + "=" + resource.value;
+			extend += ", " + resource.getName() + "=" + resource.getValue();
 		}
 		return "ResourceSpec{" +
 				"cpuCores=" + cpuCores +
@@ -297,121 +301,4 @@ public class ResourceSpec implements Serializable {
 		}
 	}
 
-	/**
-	 * Base class for additional resources one can specify.
-	 */
-	public abstract static class Resource implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Enum defining how resources are aggregated.
-		 */
-		public enum ResourceAggregateType {
-			/**
-			 * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
-			 */
-			AGGREGATE_TYPE_SUM,
-
-			/**
-			 * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
-			 */
-			AGGREGATE_TYPE_MAX
-		}
-
-		private final String name;
-
-		private final double value;
-
-		private final ResourceAggregateType type;
-
-		public Resource(String name, double value, ResourceAggregateType type) {
-			this.name = checkNotNull(name);
-			this.value = value;
-			this.type = checkNotNull(type);
-		}
-
-		Resource merge(Resource other) {
-			Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
-			Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
-			Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
-
-			Double value = null;
-			switch (type) {
-				case AGGREGATE_TYPE_MAX :
-					value = Math.max(other.value, this.value);
-					break;
-
-				case AGGREGATE_TYPE_SUM:
-				default:
-					value = this.value + other.value;
-			}
-
-			Resource resource = create(value, type);
-			return resource;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			} else if (o != null && getClass() == o.getClass()) {
-				Resource other = (Resource) o;
-
-				return name.equals(other.name) && type.equals(other.type) && value == other.value;
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			int result = name.hashCode();
-			result = 31 * result + type.ordinal();
-			result = 31 * result + (int) value;
-			return result;
-		}
-
-		public String getName() {
-			return this.name;
-		}
-
-		public ResourceAggregateType getAggregateType() {
-			return this.type;
-		}
-
-		public double getValue() {
-			return this.value;
-		}
-
-		/**
-		 * Create a resource of the same resource type.
-		 *
-		 * @param value The value of the resource
-		 * @param type The aggregate type of the resource
-		 * @return A new instance of the sub resource
-		 */
-		protected abstract Resource create(double value, ResourceAggregateType type);
-	}
-
-	/**
-	 * The GPU resource.
-	 */
-	public static class GPUResource extends Resource {
-
-		private static final long serialVersionUID = -2276080061777135142L;
-
-		public GPUResource(double value) {
-			this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
-		}
-
-		public GPUResource(double value, ResourceAggregateType type) {
-			super(GPU_NAME, value, type);
-		}
-
-		@Override
-		public Resource create(double value, ResourceAggregateType type) {
-			return new GPUResource(value, type);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
new file mode 100644
index 0000000..1e7f1ef
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.resources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ResourceSpec;
+
+/**
+ * The GPU resource.
+ */
+@Internal
+public class GPUResource extends Resource {
+
+	private static final long serialVersionUID = -2276080061777135142L;
+
+	public GPUResource(double value) {
+		this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+	}
+
+	private GPUResource(double value, ResourceAggregateType type) {
+		super(ResourceSpec.GPU_NAME, value, type);
+	}
+
+	@Override
+	public Resource create(double value, ResourceAggregateType type) {
+		return new GPUResource(value, type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
new file mode 100644
index 0000000..59448a8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.resources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for resources one can specify.
+ */
+@Internal
+public abstract class Resource implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Enum defining how resources are aggregated.
+	 */
+	public enum ResourceAggregateType {
+		/**
+		 * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
+		 */
+		AGGREGATE_TYPE_SUM,
+
+		/**
+		 * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
+		 */
+		AGGREGATE_TYPE_MAX
+	}
+
+	private final String name;
+
+	private final double value;
+
+	private final ResourceAggregateType resourceAggregateType;
+
+	protected Resource(String name, double value, ResourceAggregateType type) {
+		this.name = checkNotNull(name);
+		this.value = value;
+		this.resourceAggregateType = checkNotNull(type);
+	}
+
+	public Resource merge(Resource other) {
+		Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource resourceAggregateType");
+		Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
+		Preconditions.checkArgument(this.resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType");
+
+		final double aggregatedValue;
+		switch (resourceAggregateType) {
+			case AGGREGATE_TYPE_MAX :
+				aggregatedValue = Math.max(other.value, this.value);
+				break;
+
+			case AGGREGATE_TYPE_SUM:
+			default:
+				aggregatedValue = this.value + other.value;
+		}
+
+		return create(aggregatedValue, resourceAggregateType);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o != null && getClass() == o.getClass()) {
+			Resource other = (Resource) o;
+
+			return name.equals(other.name) && resourceAggregateType == other.resourceAggregateType && value == other.value;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		int result = name.hashCode();
+		result = 31 * result + resourceAggregateType.ordinal();
+		result = 31 * result + (int) value;
+		return result;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public ResourceAggregateType getResourceAggregateType() {
+		return this.resourceAggregateType;
+	}
+
+	public double getValue() {
+		return this.value;
+	}
+
+	/**
+	 * Create a resource of the same resource resourceAggregateType.
+	 *
+	 * @param value The value of the resource
+	 * @param type The aggregate resourceAggregateType of the resource
+	 * @return A new instance of the sub resource
+	 */
+	protected abstract Resource create(double value, ResourceAggregateType type);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index fc0bf15..3dec3f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.api.common.resources.Resource;
 import org.apache.flink.api.common.operators.ResourceSpec;
 
 import javax.annotation.Nonnull;
@@ -34,7 +35,7 @@ import java.util.Objects;
  * checked whether it can match another profile's requirement, and furthermore we may calculate a matching
  * score to decide which profile we should choose when we have lots of candidate slots.
  * It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster.
- * 
+ *
  * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
  * <ol>
  *     <li>Memory Size</li>
@@ -51,20 +52,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	// ------------------------------------------------------------------------
 
-	/** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
+	/** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
 	private final double cpuCores;
 
-	/** How many heap memory in mb are needed */
+	/** How many heap memory in mb are needed. */
 	private final int heapMemoryInMB;
 
-	/** How many direct memory in mb are needed */
+	/** How many direct memory in mb are needed. */
 	private final int directMemoryInMB;
 
-	/** How many native memory in mb are needed */
+	/** How many native memory in mb are needed. */
 	private final int nativeMemoryInMB;
 
 	/** A extensible field for user specified resources from {@link ResourceSpec}. */
-	private final Map<String, ResourceSpec.Resource> extendedResources = new HashMap<>(1);
+	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
 	// ------------------------------------------------------------------------
 
@@ -82,7 +83,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			int heapMemoryInMB,
 			int directMemoryInMB,
 			int nativeMemoryInMB,
-			Map<String, ResourceSpec.Resource> extendedResources) {
+			Map<String, Resource> extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
@@ -104,8 +105,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	/**
 	 * Creates a copy of the given ResourceProfile.
-	 * 
-	 * @param other The ResourceProfile to copy. 
+	 *
+	 * @param other The ResourceProfile to copy.
 	 */
 	public ResourceProfile(ResourceProfile other) {
 		this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
@@ -172,7 +173,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 *
 	 * @return The extended resources
 	 */
-	public Map<String, ResourceSpec.Resource> getExtendedResources() {
+	public Map<String, Resource> getExtendedResources() {
 		return Collections.unmodifiableMap(extendedResources);
 	}
 
@@ -187,9 +188,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 				heapMemoryInMB >= required.getHeapMemoryInMB() &&
 				directMemoryInMB >= required.getDirectMemoryInMB() &&
 				nativeMemoryInMB >= required.getNativeMemoryInMB()) {
-			for (Map.Entry<String, ResourceSpec.Resource> resource : required.extendedResources.entrySet()) {
+			for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
 				if (!extendedResources.containsKey(resource.getKey()) ||
-						!extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) ||
+						!extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
 						extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) {
 					return false;
 				}
@@ -206,15 +207,15 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			cmp = Double.compare(this.cpuCores, other.cpuCores);
 		}
 		if (cmp == 0) {
-			Iterator<Map.Entry<String, ResourceSpec.Resource>> thisIterator = extendedResources.entrySet().iterator();
-			Iterator<Map.Entry<String, ResourceSpec.Resource>> otherIterator = other.extendedResources.entrySet().iterator();
+			Iterator<Map.Entry<String, Resource>> thisIterator = extendedResources.entrySet().iterator();
+			Iterator<Map.Entry<String, Resource>> otherIterator = other.extendedResources.entrySet().iterator();
 			while (thisIterator.hasNext() && otherIterator.hasNext()) {
-				Map.Entry<String, ResourceSpec.Resource> thisResource = thisIterator.next();
-				Map.Entry<String, ResourceSpec.Resource> otherResource = otherIterator.next();
+				Map.Entry<String, Resource> thisResource = thisIterator.next();
+				Map.Entry<String, Resource> otherResource = otherIterator.next();
 				if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) {
 					return cmp;
 				}
-				if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) {
+				if (!otherResource.getValue().getResourceAggregateType().equals(thisResource.getValue().getResourceAggregateType())) {
 					return 1;
 				}
 				if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) {
@@ -261,32 +262,26 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 
 	@Override
 	public String toString() {
-		String resourceStr = "";
-		for (Map.Entry<String, ResourceSpec.Resource> resource : extendedResources.entrySet()) {
-			resourceStr += ", " + resource.getKey() + "=" + resource.getValue();
+		final StringBuilder resources = new StringBuilder(extendedResources.size() * 10);
+		for (Map.Entry<String, Resource> resource : extendedResources.entrySet()) {
+			resources.append(", ").append(resource.getKey()).append('=').append(resource.getValue());
 		}
 		return "ResourceProfile{" +
 			"cpuCores=" + cpuCores +
 			", heapMemoryInMB=" + heapMemoryInMB +
 			", directMemoryInMB=" + directMemoryInMB +
-			", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr +
+			", nativeMemoryInMB=" + nativeMemoryInMB + resources +
 			'}';
 	}
 
-	public static ResourceProfile fromResourceSpec(
-			ResourceSpec resourceSpec) {
-		Map<String, ResourceSpec.Resource> extendResource = new HashMap<>(1);
-		double gpu = resourceSpec.getGPUResource();
-		if (gpu > 0) {
-			extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu));
-		}
-		ResourceProfile resourceProfile = new ResourceProfile(
+	static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) {
+		Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
+
+		return new ResourceProfile(
 				resourceSpec.getCpuCores(),
 				resourceSpec.getHeapMemory(),
 				resourceSpec.getDirectMemory(),
 				resourceSpec.getNativeMemory(),
-				extendResource
-		);
-		return resourceProfile;
+				copiedExtendedResources);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index bfe7567..fd36568 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order}
+import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod