You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/15 07:55:13 UTC
[2/2] flink git commit: [FLINK-7928] Move Resource out of ResourceSpec
[FLINK-7928] Move Resource out of ResourceSpec
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76e3156d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76e3156d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76e3156d
Branch: refs/heads/master
Commit: 76e3156d014e12094bbb55ecbd024b5edbf4b4cf
Parents: 5643d15
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 14 16:25:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 16:25:43 2017 +0100
----------------------------------------------------------------------
.../api/common/operators/ResourceSpec.java | 141 ++-----------------
.../flink/api/common/resources/GPUResource.java | 44 ++++++
.../flink/api/common/resources/Resource.java | 123 ++++++++++++++++
.../clusterframework/types/ResourceProfile.java | 59 ++++----
.../org/apache/flink/api/scala/DataSet.scala | 2 +-
5 files changed, 209 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index e82c738..7bc2948 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -19,7 +19,8 @@
package org.apache.flink.api.common.operators;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.api.common.resources.GPUResource;
+import org.apache.flink.api.common.resources.Resource;
import javax.annotation.Nonnull;
@@ -28,8 +29,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* Describe the different resource factors of the operator with UDF.
*
@@ -97,7 +96,7 @@ public class ResourceSpec implements Serializable {
this.stateSizeInMB = stateSizeInMB;
for (Resource resource : extendedResources) {
if (resource != null) {
- this.extendedResources.put(resource.name, resource);
+ this.extendedResources.put(resource.getName(), resource);
}
}
}
@@ -118,7 +117,7 @@ public class ResourceSpec implements Serializable {
this.stateSizeInMB + other.stateSizeInMB);
target.extendedResources.putAll(extendedResources);
for (Resource resource : other.extendedResources.values()) {
- target.extendedResources.merge(resource.name, resource, (v1, v2) -> v1.merge(v2));
+ target.extendedResources.merge(resource.getName(), resource, (v1, v2) -> v1.merge(v2));
}
return target;
}
@@ -148,9 +147,14 @@ public class ResourceSpec implements Serializable {
if (gpuResource != null) {
return gpuResource.getValue();
}
+
return 0.0;
}
+ public Map<String, Resource> getExtendedResources() {
+ return extendedResources;
+ }
+
/**
* Check whether all the field values are valid.
*
@@ -160,7 +164,7 @@ public class ResourceSpec implements Serializable {
if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
for (Resource resource : extendedResources.values()) {
- if (resource.value < 0) {
+ if (resource.getValue() < 0) {
return false;
}
}
@@ -185,9 +189,9 @@ public class ResourceSpec implements Serializable {
int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
for (Resource resource : extendedResources.values()) {
- if (!other.extendedResources.containsKey(resource.name) ||
- !other.extendedResources.get(resource.name).type.equals(resource.type) ||
- other.extendedResources.get(resource.name).value < resource.value) {
+ if (!other.extendedResources.containsKey(resource.getName()) ||
+ other.extendedResources.get(resource.getName()).getResourceAggregateType() != resource.getResourceAggregateType() ||
+ other.extendedResources.get(resource.getName()).getValue() < resource.getValue()) {
return false;
}
}
@@ -229,7 +233,7 @@ public class ResourceSpec implements Serializable {
public String toString() {
String extend = "";
for (Resource resource : extendedResources.values()) {
- extend += ", " + resource.name + "=" + resource.value;
+ extend += ", " + resource.getName() + "=" + resource.getValue();
}
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
@@ -297,121 +301,4 @@ public class ResourceSpec implements Serializable {
}
}
- /**
- * Base class for additional resources one can specify.
- */
- public abstract static class Resource implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Enum defining how resources are aggregated.
- */
- public enum ResourceAggregateType {
- /**
- * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
- */
- AGGREGATE_TYPE_SUM,
-
- /**
- * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
- */
- AGGREGATE_TYPE_MAX
- }
-
- private final String name;
-
- private final double value;
-
- private final ResourceAggregateType type;
-
- public Resource(String name, double value, ResourceAggregateType type) {
- this.name = checkNotNull(name);
- this.value = value;
- this.type = checkNotNull(type);
- }
-
- Resource merge(Resource other) {
- Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
- Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
- Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
-
- Double value = null;
- switch (type) {
- case AGGREGATE_TYPE_MAX :
- value = Math.max(other.value, this.value);
- break;
-
- case AGGREGATE_TYPE_SUM:
- default:
- value = this.value + other.value;
- }
-
- Resource resource = create(value, type);
- return resource;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- } else if (o != null && getClass() == o.getClass()) {
- Resource other = (Resource) o;
-
- return name.equals(other.name) && type.equals(other.type) && value == other.value;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- int result = name.hashCode();
- result = 31 * result + type.ordinal();
- result = 31 * result + (int) value;
- return result;
- }
-
- public String getName() {
- return this.name;
- }
-
- public ResourceAggregateType getAggregateType() {
- return this.type;
- }
-
- public double getValue() {
- return this.value;
- }
-
- /**
- * Create a resource of the same resource type.
- *
- * @param value The value of the resource
- * @param type The aggregate type of the resource
- * @return A new instance of the sub resource
- */
- protected abstract Resource create(double value, ResourceAggregateType type);
- }
-
- /**
- * The GPU resource.
- */
- public static class GPUResource extends Resource {
-
- private static final long serialVersionUID = -2276080061777135142L;
-
- public GPUResource(double value) {
- this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
- }
-
- public GPUResource(double value, ResourceAggregateType type) {
- super(GPU_NAME, value, type);
- }
-
- @Override
- public Resource create(double value, ResourceAggregateType type) {
- return new GPUResource(value, type);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
new file mode 100644
index 0000000..1e7f1ef
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/GPUResource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.resources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ResourceSpec;
+
+/**
+ * The GPU resource.
+ */
+@Internal
+public class GPUResource extends Resource {
+
+ private static final long serialVersionUID = -2276080061777135142L;
+
+ public GPUResource(double value) {
+ this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+ }
+
+ private GPUResource(double value, ResourceAggregateType type) {
+ super(ResourceSpec.GPU_NAME, value, type);
+ }
+
+ @Override
+ public Resource create(double value, ResourceAggregateType type) {
+ return new GPUResource(value, type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
new file mode 100644
index 0000000..59448a8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.resources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for resources one can specify.
+ */
+@Internal
+public abstract class Resource implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Enum defining how resources are aggregated.
+ */
+ public enum ResourceAggregateType {
+ /**
+ * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining.
+ */
+ AGGREGATE_TYPE_SUM,
+
+ /**
+ * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining.
+ */
+ AGGREGATE_TYPE_MAX
+ }
+
+ private final String name;
+
+ private final double value;
+
+ private final ResourceAggregateType resourceAggregateType;
+
+ protected Resource(String name, double value, ResourceAggregateType type) {
+ this.name = checkNotNull(name);
+ this.value = value;
+ this.resourceAggregateType = checkNotNull(type);
+ }
+
+ public Resource merge(Resource other) {
+ Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource resourceAggregateType");
+ Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
+ Preconditions.checkArgument(this.resourceAggregateType == other.resourceAggregateType, "Merge with different aggregate resourceAggregateType");
+
+ final double aggregatedValue;
+ switch (resourceAggregateType) {
+ case AGGREGATE_TYPE_MAX :
+ aggregatedValue = Math.max(other.value, this.value);
+ break;
+
+ case AGGREGATE_TYPE_SUM:
+ default:
+ aggregatedValue = this.value + other.value;
+ }
+
+ return create(aggregatedValue, resourceAggregateType);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o != null && getClass() == o.getClass()) {
+ Resource other = (Resource) o;
+
+ return name.equals(other.name) && resourceAggregateType == other.resourceAggregateType && value == other.value;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + resourceAggregateType.ordinal();
+ result = 31 * result + (int) value;
+ return result;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public ResourceAggregateType getResourceAggregateType() {
+ return this.resourceAggregateType;
+ }
+
+ public double getValue() {
+ return this.value;
+ }
+
+ /**
+ * Create a resource of the same resource resourceAggregateType.
+ *
+ * @param value The value of the resource
+ * @param type The aggregate resourceAggregateType of the resource
+ * @return A new instance of the sub resource
+ */
+ protected abstract Resource create(double value, ResourceAggregateType type);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index fc0bf15..3dec3f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.clusterframework.types;
+import org.apache.flink.api.common.resources.Resource;
import org.apache.flink.api.common.operators.ResourceSpec;
import javax.annotation.Nonnull;
@@ -34,7 +35,7 @@ import java.util.Objects;
* checked whether it can match another profile's requirement, and furthermore we may calculate a matching
* score to decide which profile we should choose when we have lots of candidate slots.
* It should be generated from {@link ResourceSpec} with the input and output memory calculated in JobMaster.
- *
+ *
* <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
* <ol>
* <li>Memory Size</li>
@@ -51,20 +52,20 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
// ------------------------------------------------------------------------
- /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
+ /** How many cpu cores are needed, use double so we can specify cpu like 0.1. */
private final double cpuCores;
- /** How many heap memory in mb are needed */
+ /** How many heap memory in mb are needed. */
private final int heapMemoryInMB;
- /** How many direct memory in mb are needed */
+ /** How many direct memory in mb are needed. */
private final int directMemoryInMB;
- /** How many native memory in mb are needed */
+ /** How many native memory in mb are needed. */
private final int nativeMemoryInMB;
/** A extensible field for user specified resources from {@link ResourceSpec}. */
- private final Map<String, ResourceSpec.Resource> extendedResources = new HashMap<>(1);
+ private final Map<String, Resource> extendedResources = new HashMap<>(1);
// ------------------------------------------------------------------------
@@ -82,7 +83,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
int heapMemoryInMB,
int directMemoryInMB,
int nativeMemoryInMB,
- Map<String, ResourceSpec.Resource> extendedResources) {
+ Map<String, Resource> extendedResources) {
this.cpuCores = cpuCores;
this.heapMemoryInMB = heapMemoryInMB;
this.directMemoryInMB = directMemoryInMB;
@@ -104,8 +105,8 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
/**
* Creates a copy of the given ResourceProfile.
- *
- * @param other The ResourceProfile to copy.
+ *
+ * @param other The ResourceProfile to copy.
*/
public ResourceProfile(ResourceProfile other) {
this(other.cpuCores, other.heapMemoryInMB, other.directMemoryInMB, other.nativeMemoryInMB, other.extendedResources);
@@ -172,7 +173,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
*
* @return The extended resources
*/
- public Map<String, ResourceSpec.Resource> getExtendedResources() {
+ public Map<String, Resource> getExtendedResources() {
return Collections.unmodifiableMap(extendedResources);
}
@@ -187,9 +188,9 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
heapMemoryInMB >= required.getHeapMemoryInMB() &&
directMemoryInMB >= required.getDirectMemoryInMB() &&
nativeMemoryInMB >= required.getNativeMemoryInMB()) {
- for (Map.Entry<String, ResourceSpec.Resource> resource : required.extendedResources.entrySet()) {
+ for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
if (!extendedResources.containsKey(resource.getKey()) ||
- !extendedResources.get(resource.getKey()).getAggregateType().equals(resource.getValue().getAggregateType()) ||
+ !extendedResources.get(resource.getKey()).getResourceAggregateType().equals(resource.getValue().getResourceAggregateType()) ||
extendedResources.get(resource.getKey()).getValue() < resource.getValue().getValue()) {
return false;
}
@@ -206,15 +207,15 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
cmp = Double.compare(this.cpuCores, other.cpuCores);
}
if (cmp == 0) {
- Iterator<Map.Entry<String, ResourceSpec.Resource>> thisIterator = extendedResources.entrySet().iterator();
- Iterator<Map.Entry<String, ResourceSpec.Resource>> otherIterator = other.extendedResources.entrySet().iterator();
+ Iterator<Map.Entry<String, Resource>> thisIterator = extendedResources.entrySet().iterator();
+ Iterator<Map.Entry<String, Resource>> otherIterator = other.extendedResources.entrySet().iterator();
while (thisIterator.hasNext() && otherIterator.hasNext()) {
- Map.Entry<String, ResourceSpec.Resource> thisResource = thisIterator.next();
- Map.Entry<String, ResourceSpec.Resource> otherResource = otherIterator.next();
+ Map.Entry<String, Resource> thisResource = thisIterator.next();
+ Map.Entry<String, Resource> otherResource = otherIterator.next();
if ((cmp = otherResource.getKey().compareTo(thisResource.getKey())) != 0) {
return cmp;
}
- if (!otherResource.getValue().getAggregateType().equals(thisResource.getValue().getAggregateType())) {
+ if (!otherResource.getValue().getResourceAggregateType().equals(thisResource.getValue().getResourceAggregateType())) {
return 1;
}
if ((cmp = Double.compare(thisResource.getValue().getValue(), otherResource.getValue().getValue())) != 0) {
@@ -261,32 +262,26 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
@Override
public String toString() {
- String resourceStr = "";
- for (Map.Entry<String, ResourceSpec.Resource> resource : extendedResources.entrySet()) {
- resourceStr += ", " + resource.getKey() + "=" + resource.getValue();
+ final StringBuilder resources = new StringBuilder(extendedResources.size() * 10);
+ for (Map.Entry<String, Resource> resource : extendedResources.entrySet()) {
+ resources.append(", ").append(resource.getKey()).append('=').append(resource.getValue());
}
return "ResourceProfile{" +
"cpuCores=" + cpuCores +
", heapMemoryInMB=" + heapMemoryInMB +
", directMemoryInMB=" + directMemoryInMB +
- ", nativeMemoryInMB=" + nativeMemoryInMB + resourceStr +
+ ", nativeMemoryInMB=" + nativeMemoryInMB + resources +
'}';
}
- public static ResourceProfile fromResourceSpec(
- ResourceSpec resourceSpec) {
- Map<String, ResourceSpec.Resource> extendResource = new HashMap<>(1);
- double gpu = resourceSpec.getGPUResource();
- if (gpu > 0) {
- extendResource.put(ResourceSpec.GPU_NAME, new ResourceSpec.GPUResource(gpu));
- }
- ResourceProfile resourceProfile = new ResourceProfile(
+ static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec) {
+ Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
+
+ return new ResourceProfile(
resourceSpec.getCpuCores(),
resourceSpec.getHeapMemory(),
resourceSpec.getDirectMemory(),
resourceSpec.getNativeMemory(),
- extendResource
- );
- return resourceProfile;
+ copiedExtendedResources);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/76e3156d/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index bfe7567..fd36568 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator
import org.apache.flink.api.common.aggregators.Aggregator
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order}
+import org.apache.flink.api.common.operators.{Keys, Order, ResourceSpec}
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod