You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shuai-xu <gi...@git.apache.org> on 2017/10/27 08:32:15 UTC

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

GitHub user shuai-xu opened a pull request:

    https://github.com/apache/flink/pull/4911

    [FLINK-7878] [api] make resource type extendible in ResourceSpec

    Summary:
    Now, flink only support user define CPU and MEM,
    but some user need to specify the GPU, FPGA and so on resources.
    So it need to make the resource type extendible in the ResourceSpec.
    Add a extend field for new resources.
    
    ## What is the purpose of the change
    
    This pull request adds a extensible filed to the ResourceSpec, so user can define variable resources only if supported by their resource manager.
    
    *(for example:)*
    user can use 
    _text.flatMap().setResource(new ResourceSpce(1, 100, new ResourceSpce.Resource("GPU", 0.5)));_
    to define their GPU requirement for the operator.
    
    ## Verifying this change
    This change added tests and can be verified as follows:
      - *Added unit tests ResourceSpecTest to verify.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shuai-xu/flink jira-7878

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4911
    
----
commit 3e1d61a33f18b351424d4684cbaebc22674f582c
Author: shuai.xus <sh...@alibaba-inc.com>
Date:   2017-10-25T06:56:35Z

    [FLINK-7878] [api] make resource type extendible in ResourceSpec
    
    Summary:
    Now, flink only support user define CPU and MEM,
    but some user need to specify the GPU, FPGA and so on resources.
    So it need to make the resouce type extendible in the ResourceSpec.
    Add a extend field for new resources.
    
    Test Plan: UnitTest
    
    Reviewers: haitao.w
    
    Differential Revision: https://aone.alibaba-inc.com/code/D327427

----


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    Changes look good @shuai-xu. I have a single last question which is whether we want to expose the `GpuResource` to the user or not. Wouldn't it be enough to set the gpus via `Builder#setGpuResource(double)`? Of course, then we weren't able to set the `ResourceAggregateType` but I'm wondering whether GPUs shouldn't be something else than the sum. We actually have the same problem with the cpus. They are hardcoded to be the max of both values.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149096402
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +public class ResourceSpecTest {
    --- End diff --
    
    Tests should extend from the `TestLogger`


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149096804
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +public class ResourceSpecTest {
    +
    +	@Test
    +	public void testIsValid() throws Exception {
    +		ResourceSpec rs = new ResourceSpec(1.0, 100);
    +		assertTrue(rs.isValid());
    +
    +		rs = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("GPU", 1));
    +		assertTrue(rs.isValid());
    +
    +		rs = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("GPU", -1));
    +		assertFalse(rs.isValid());
    +	}
    +
    +	@Test
    +	public void testLessThanOrEqual() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertTrue(rs1.lessThanOrEqual(rs2));
    +		assertTrue(rs2.lessThanOrEqual(rs1));
    +
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 1));
    +		assertTrue(rs1.lessThanOrEqual(rs2));
    +		assertFalse(rs2.lessThanOrEqual(rs1));
    +
    +		rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 2));
    --- End diff --
    
    reusing variable names makes things harder to follow in a tests. Thus, I would recommend introducing a fresh name.


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    @tillrohrmann, I make the Resource abstract and add GPUResource and FPGAResource, so user can only add such defined resources, how about it?


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149098202
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +238,81 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	private void addResource(String name, double value, ResourceAggregateType type) {
    +		extendedResources.put(name, new Resource(name, type, value));
    +	}
    +
    +	public static class Resource {
    +		private String name;
    +		private ResourceAggregateType type;
    +		private Double value;
    +
    +		public Resource(String name, double value) {
    +			this(name, ResourceAggregateType.AGGREGATE_TYPE_SUM, value);
    --- End diff --
    
    `ResourceAggregateType` should be the last argument since `name` and `value` are properly passed to this method.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149096457
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +public class ResourceSpecTest {
    --- End diff --
    
    JavaDocs would also be good.


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    I run the failed test on my machine and it pass, and it seems my changes will not influence it.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149610252
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -61,18 +79,17 @@
     	/** How many state size in mb are used */
     	private final int stateSizeInMB;
     
    +	private final Map<String, Resource> extendedResources = new HashMap<>(1);
    --- End diff --
    
    done, and add a test for it.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154964974
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -133,14 +162,31 @@ public int getStateSize() {
     		return this.stateSizeInMB;
     	}
     
    +	public Map<String, Double> getExtendedResources() {
    +		Map<String, Double> resources = new HashMap<>(extendedResources.size());
    +		for (Resource resource : extendedResources.values()) {
    +			resources.put(resource.name, resource.value);
    +		}
    +		return Collections.unmodifiableMap(resources);
    +	}
    --- End diff --
    
    Let's remove this method because it expose the internal resources.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154499927
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +240,124 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	public static abstract class Resource implements Serializable {
    +		final private String name;
    +
    +		final private Double value;
    +
    +		final private ResourceAggregateType type;
    +
    +		public Resource(String name, double value, ResourceAggregateType type) {
    +			this.name = checkNotNull(name);
    +			this.value = Double.valueOf(value);
    +			this.type = checkNotNull(type);
    +		}
    +
    +		Resource merge(Resource other) {
    +			Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
    +			Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
    +			Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
    +
    +			Double value = null;
    +			switch (type) {
    +				case AGGREGATE_TYPE_MAX :
    +					value = other.value.compareTo(this.value) > 0 ? other.value : this.value;
    +					break;
    +
    +				case AGGREGATE_TYPE_SUM:
    +				default:
    +					value = this.value + other.value;
    +			}
    +
    +			Resource resource = create(value, type);
    +			return resource;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			} else if (o != null && getClass() == o.getClass()) {
    +				Resource other = (Resource) o;
    +
    +				return name.equals(other.name) && type.equals(other.type) && value.equals(other.value);
    +			} else {
    +				return false;
    +			}
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			int result = name != null ? name.hashCode() : 0;
    +			result = 31 * result + type.ordinal();
    +			result = 31 * result + value.hashCode();
    +			return result;
    +		}
    +
    +		/**
    +		 * create a resource of the same resource type
    --- End diff --
    
    Capital letter


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154965306
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +240,124 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	public static abstract class Resource implements Serializable {
    +		final private String name;
    +
    +		final private Double value;
    --- End diff --
    
    Wrong order of keywords: `private final`


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    I think what you are referring to is the current trunk branch of Hadoop where they introduce `ResourceInformation`. Currently, the set of supported resources is  `"memory-mb", "vcores", "yarn.io/gpu"`. Given that you have to exactly set these names for the resource which are exclusive for `YARN`, I would actually say that we should explicitly add these resources (e.g. introducing a proper `GPUResource`). That way, this would also work if you let your job run on Mesos without having to adapt the resource names. Adding more resources should be fairly easy and therefore we don't need the generic resource. Does that make sense @shuai-xu. 


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    @tillrohrmann, Yes, the Resource is a little too generic and prone to typos. However, the resource are various and closely related to the platform(YARN/MESOS), only a GPUResource and FPGAResource may not satisfy user's need. For example, we have at lease two types of FPGA resources in our cluster. And it could consider the users who need to specify extended resources as advanced users. General users only need to know vcore and memory, which are already defined in ResurceSpec. Advanced users should be familiar with not only flink but also the resource platform. They should know the resources types YARN/MESOS supports. And, If flink resource manager passes all the extended resource to YARN/MESOS when starting a container, it need not change when adding a new resource type only if YARN/MESOS can recognize it from extended resources. There has to be a compromise between extendibility and ease of use. I suggest we can add a general GPUResource and FPGAResource for general use while st
 ill keeping the Resource for extension. Does this make sense?


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149096221
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +238,81 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	private void addResource(String name, double value, ResourceAggregateType type) {
    +		extendedResources.put(name, new Resource(name, type, value));
    +	}
    +
    +	public static class Resource {
    +		private String name;
    +		private ResourceAggregateType type;
    +		private Double value;
    +
    +		public Resource(String name, double value) {
    +			this(name, ResourceAggregateType.AGGREGATE_TYPE_SUM, value);
    +		}
    +
    +		public Resource(String name, ResourceAggregateType type, double value) {
    +			this.name = name;
    +			this.type = type;
    --- End diff --
    
    `checkNotNull` missing


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    How do you pass the resource specification in a generic way to Yarn and Mesos? Is there some kind of interface defined for that?


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154499904
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +240,124 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	public static abstract class Resource implements Serializable {
    +		final private String name;
    +
    +		final private Double value;
    --- End diff --
    
    use primitive type


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4911


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149096169
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +238,81 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	private void addResource(String name, double value, ResourceAggregateType type) {
    --- End diff --
    
    Who calls this method?


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149097234
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java ---
    @@ -0,0 +1,144 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +public class ResourceSpecTest {
    +
    +	@Test
    +	public void testIsValid() throws Exception {
    +		ResourceSpec rs = new ResourceSpec(1.0, 100);
    +		assertTrue(rs.isValid());
    +
    +		rs = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("GPU", 1));
    +		assertTrue(rs.isValid());
    +
    +		rs = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("GPU", -1));
    +		assertFalse(rs.isValid());
    +	}
    +
    +	@Test
    +	public void testLessThanOrEqual() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertTrue(rs1.lessThanOrEqual(rs2));
    +		assertTrue(rs2.lessThanOrEqual(rs1));
    +
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 1));
    +		assertTrue(rs1.lessThanOrEqual(rs2));
    +		assertFalse(rs2.lessThanOrEqual(rs1));
    +
    +		rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 2));
    +		assertFalse(rs1.lessThanOrEqual(rs2));
    +		assertTrue(rs2.lessThanOrEqual(rs1));
    +
    +		rs2 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("FPGA", 1),
    +				new ResourceSpec.Resource("GPU", 1));
    +		assertFalse(rs1.lessThanOrEqual(rs2));
    +		assertFalse(rs2.lessThanOrEqual(rs1));
    +	}
    +
    +	@Test
    +	public void testEquals() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertTrue(rs1.equals(rs2));
    +		assertTrue(rs2.equals(rs1));
    +
    +		rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 2.2));
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 1));
    +		assertFalse(rs1.equals(rs2));
    +
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 2.2));
    +		assertTrue(rs1.equals(rs2));
    +
    +		rs1 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("FPGA", 2),
    +				new ResourceSpec.Resource("GPU", 0.5));
    +		rs2 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("FPGA", 2),
    +				new ResourceSpec.Resource("GPU", ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX, 0.5));
    +		assertFalse(rs1.equals(rs2));
    +	}
    +
    +	@Test
    +	public void testHashCode() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertEquals(rs1.hashCode(), rs2.hashCode());
    +
    +		rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 2.2));
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 1));
    +		assertFalse(rs1.hashCode() == rs2.hashCode());
    +
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 2.2));
    +		assertEquals(rs1.hashCode(), rs2.hashCode());
    +
    +		rs1 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("FPGA", 2),
    +				new ResourceSpec.Resource("GPU", 0.5));
    +		rs2 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("FPGA", 2),
    +				new ResourceSpec.Resource("GPU", ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX, 0.5));
    +		assertFalse(rs1.hashCode() == rs2.hashCode());
    +	}
    +
    +	@Test
    +	public void testMerge() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.Resource("FPGA", 1.1));
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +
    +		ResourceSpec rs3 = rs1.merge(rs2);
    +		assertEquals(1.1, rs3.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +
    +		ResourceSpec rs4 = rs1.merge(rs3);
    +		assertEquals(2.2, rs4.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +
    +		rs1 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("FPGA", 2),
    +				new ResourceSpec.Resource("GPU", 0.5));
    +		ResourceSpec rs5 = rs1.merge(rs2);
    +		assertEquals(2, rs5.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +		assertEquals(0.5, rs5.getExtendedResources().get("GPU").doubleValue(), 0.000001);
    +
    +		rs2 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.Resource("GPU", ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX, 0.5));
    +		try {
    +			rs1.merge(rs2);
    +			fail("Merge with different aggregate type should fail");
    +		} catch (IllegalArgumentException e) {
    --- End diff --
    
    rename `e` to `ignored`


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    @tillrohrmann , I agree with you that adding a build looks better, I changed it according to your comments. Do you think it works now?


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154965613
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +240,124 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	public static abstract class Resource implements Serializable {
    +		final private String name;
    +
    +		final private Double value;
    +
    +		final private ResourceAggregateType type;
    +
    +		public Resource(String name, double value, ResourceAggregateType type) {
    +			this.name = checkNotNull(name);
    +			this.value = Double.valueOf(value);
    +			this.type = checkNotNull(type);
    +		}
    +
    +		Resource merge(Resource other) {
    +			Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
    +			Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
    +			Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
    +
    +			Double value = null;
    +			switch (type) {
    +				case AGGREGATE_TYPE_MAX :
    +					value = other.value.compareTo(this.value) > 0 ? other.value : this.value;
    +					break;
    +
    +				case AGGREGATE_TYPE_SUM:
    +				default:
    +					value = this.value + other.value;
    +			}
    +
    +			Resource resource = create(value, type);
    +			return resource;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			} else if (o != null && getClass() == o.getClass()) {
    +				Resource other = (Resource) o;
    +
    +				return name.equals(other.name) && type.equals(other.type) && value.equals(other.value);
    +			} else {
    +				return false;
    +			}
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			int result = name != null ? name.hashCode() : 0;
    +			result = 31 * result + type.ordinal();
    +			result = 31 * result + value.hashCode();
    +			return result;
    +		}
    +
    +		/**
    +		 * create a resource of the same resource type
    +		 *
    +		 * @param value the value of the resource
    +		 * @param type the aggregate type of the resource
    +		 * @return a new instance of the sub resource
    +		 */
    +		protected abstract Resource create(double value, ResourceAggregateType type);
    +	}
    +
    +	/**
    +	 * The GPU resource.
    +	 */
    +	public static class GPUResource extends Resource {
    +
    +		public GPUResource(double value) {
    +			this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
    +		}
    +
    +		public GPUResource(double value, ResourceAggregateType type) {
    +			super("GPU", value, type);
    +		}
    +
    +		@Override
    +		public Resource create(double value, ResourceAggregateType type) {
    +			return new GPUResource(value, type);
    +		}
    +	}
    +
    +	/**
    +	 * The FPGA resource.
    +	 */
    +	public static class FPGAResource extends Resource {
    --- End diff --
    
    I think this resource is too specific for Flink right now. Therefore I would remove it and only keep the `GPUResource`.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149098417
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -61,18 +79,17 @@
     	/** How many state size in mb are used */
     	private final int stateSizeInMB;
     
    +	private final Map<String, Resource> extendedResources = new HashMap<>(1);
    --- End diff --
    
    This violates the serializability of `ResourceSpec` if `Resource` itself is not serializable.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154500054
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -83,18 +102,23 @@ public ResourceSpec(double cpuCores, int heapMemoryInMB) {
     	 * @param directMemoryInMB The size of the java nio direct memory, in megabytes.
     	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
     	 * @param stateSizeInMB The state size for storing in checkpoint.
    +	 * @param extendedResources The extended resources, associated with the resource manager used
     	 */
     	public ResourceSpec(
     			double cpuCores,
     			int heapMemoryInMB,
     			int directMemoryInMB,
     			int nativeMemoryInMB,
    -			int stateSizeInMB) {
    +			int stateSizeInMB,
    +			Resource... extendedResources) {
    --- End diff --
    
    I think we have far too many constructors to create a `ResourceSpec`. I would suggest to offer a single constructor plus a builder for the `ResourceSpec`. This builder should in the initial version only allow to set a GPU resource and no other resources. Moreover, we should make this constructor protected.


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154500041
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.operators;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * Tests for ResourceSpec class, including its all public api: isValid, lessThanOrEqual, equals, hashCode and merge.
    + */
    +public class ResourceSpecTest extends TestLogger {
    +
    +	@Test
    +	public void testIsValid() throws Exception {
    +		ResourceSpec rs = new ResourceSpec(1.0, 100);
    +		assertTrue(rs.isValid());
    +
    +		rs = new ResourceSpec(1.0, 100, new ResourceSpec.GPUResource(1));
    +		assertTrue(rs.isValid());
    +
    +		rs = new ResourceSpec(1.0, 100, new ResourceSpec.GPUResource(-1));
    +		assertFalse(rs.isValid());
    +	}
    +
    +	@Test
    +	public void testLessThanOrEqual() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertTrue(rs1.lessThanOrEqual(rs2));
    +		assertTrue(rs2.lessThanOrEqual(rs1));
    +
    +		rs2 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(1));
    +		assertTrue(rs1.lessThanOrEqual(rs2));
    +		assertFalse(rs2.lessThanOrEqual(rs1));
    +
    +		ResourceSpec rs3 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(2));
    +		assertFalse(rs3.lessThanOrEqual(rs2));
    +		assertTrue(rs2.lessThanOrEqual(rs3));
    +
    +		ResourceSpec rs4 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource(1),
    +				new ResourceSpec.GPUResource( 1));
    +		assertFalse(rs3.lessThanOrEqual(rs4));
    +		assertFalse(rs4.lessThanOrEqual(rs3));
    +	}
    +
    +	@Test
    +	public void testEquals() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertTrue(rs1.equals(rs2));
    +		assertTrue(rs2.equals(rs1));
    +
    +		ResourceSpec rs3 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(2.2));
    +		ResourceSpec rs4 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource( 1));
    +		assertFalse(rs3.equals(rs4));
    +
    +		ResourceSpec rs5 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(2.2));
    +		assertTrue(rs3.equals(rs5));
    +
    +		ResourceSpec rs6 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource(2),
    +				new ResourceSpec.GPUResource( 0.5));
    +		ResourceSpec rs7 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource( 2),
    +				new ResourceSpec.GPUResource(0.5, ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX));
    +		assertFalse(rs6.equals(rs7));
    +	}
    +
    +	@Test
    +	public void testHashCode() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100);
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +		assertEquals(rs1.hashCode(), rs2.hashCode());
    +
    +		ResourceSpec rs3 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(2.2));
    +		ResourceSpec rs4 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(1));
    +		assertFalse(rs3.hashCode() == rs4.hashCode());
    +
    +		ResourceSpec rs5 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource( 2.2));
    +		assertEquals(rs3.hashCode(), rs5.hashCode());
    +
    +		ResourceSpec rs6 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource( 2),
    +				new ResourceSpec.GPUResource(0.5));
    +		ResourceSpec rs7 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource(2),
    +				new ResourceSpec.GPUResource(0.5, ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX));
    +		assertFalse(rs6.hashCode() == rs7.hashCode());
    +	}
    +
    +	@Test
    +	public void testMerge() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(1.1));
    +		ResourceSpec rs2 = new ResourceSpec(1.0, 100);
    +
    +		ResourceSpec rs3 = rs1.merge(rs2);
    +		assertEquals(1.1, rs3.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +
    +		ResourceSpec rs4 = rs1.merge(rs3);
    +		assertEquals(2.2, rs4.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +
    +		ResourceSpec rs6 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource(2),
    +				new ResourceSpec.GPUResource(0.5));
    +		ResourceSpec rs5 = rs6.merge(rs2);
    +		assertEquals(2, rs5.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +		assertEquals(0.5, rs5.getExtendedResources().get("GPU").doubleValue(), 0.000001);
    +
    +		ResourceSpec rs7 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.GPUResource( 0.5, ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX));
    +		try {
    +			rs6.merge(rs7);
    +			fail("Merge with different aggregate type should fail");
    +		} catch (IllegalArgumentException ignored) {
    +		}
    +
    +		ResourceSpec rs8 = new ResourceSpec(1.0, 100,
    +				new ResourceSpec.FPGAResource(2),
    +				new ResourceSpec.GPUResource(1.5, ResourceSpec.ResourceAggregateType.AGGREGATE_TYPE_MAX));
    +		ResourceSpec rs9 = rs8.merge(rs7);
    +		assertEquals(2, rs9.getExtendedResources().get("FPGA").doubleValue(), 0.000001);
    +		assertEquals(1.5, rs9.getExtendedResources().get("GPU").doubleValue(), 0.000001);
    +
    +	}
    +
    +	@Test
    +	public void testSerializable() throws Exception {
    +		ResourceSpec rs1 = new ResourceSpec(1.0, 100, new ResourceSpec.FPGAResource(1.1));
    +		byte[] buffer = InstantiationUtil.serializeObject(rs1);
    +		ResourceSpec rs2 = InstantiationUtil.deserializeObject(buffer, ClassLoader.getSystemClassLoader());
    +		assertTrue(rs1.equals(rs2));
    --- End diff --
    
    better to use `assertEquals`


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154499917
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +240,124 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	public static abstract class Resource implements Serializable {
    +		final private String name;
    +
    +		final private Double value;
    +
    +		final private ResourceAggregateType type;
    +
    +		public Resource(String name, double value, ResourceAggregateType type) {
    +			this.name = checkNotNull(name);
    +			this.value = Double.valueOf(value);
    +			this.type = checkNotNull(type);
    +		}
    +
    +		Resource merge(Resource other) {
    +			Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
    +			Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
    +			Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
    +
    +			Double value = null;
    +			switch (type) {
    +				case AGGREGATE_TYPE_MAX :
    +					value = other.value.compareTo(this.value) > 0 ? other.value : this.value;
    --- End diff --
    
    `Math.max` should do the trick here.


---

[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/4911
  
    @tillrohrmann, There is not a generic way for both Yarn and Mesos, as their resource allocation interface are different. I think the YARN/MESOS resource manager should handle it in their own way. For example, in YarnResourceManager, it can add all extended resources to the yarn Resource.class by call setResourceValue(name, value). And then only if YARN support a new resource type, user can define it without code changing in flink. 


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154965118
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -37,11 +44,24 @@
      *     <li>Direct Memory Size</li>
      *     <li>Native Memory Size</li>
      *     <li>State Size</li>
    + *     <li>Extended resources</li>
      * </ol>
      */
     @Internal
     public class ResourceSpec implements Serializable {
     
    +	public enum ResourceAggregateType {
    +		/**
    +		 * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining
    +		 */
    +		AGGREGATE_TYPE_SUM,
    +
    +		/**
    +		 * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining
    +		 */
    +		AGGREGATE_TYPE_MAX
    +	}
    --- End diff --
    
    Let's move this enum to `Resource`


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r149096103
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -183,17 +238,81 @@ public int hashCode() {
     		result = 31 * result + directMemoryInMB;
     		result = 31 * result + nativeMemoryInMB;
     		result = 31 * result + stateSizeInMB;
    +		result = 31 * result + extendedResources.hashCode();
     		return result;
     	}
     
     	@Override
     	public String toString() {
    +		String extend = "";
    +		for (Resource resource : extendedResources.values()) {
    +			extend += ", " + resource.name + "=" + resource.value;
    +		}
     		return "ResourceSpec{" +
     				"cpuCores=" + cpuCores +
     				", heapMemoryInMB=" + heapMemoryInMB +
     				", directMemoryInMB=" + directMemoryInMB +
     				", nativeMemoryInMB=" + nativeMemoryInMB +
    -				", stateSizeInMB=" + stateSizeInMB +
    +				", stateSizeInMB=" + stateSizeInMB + extend +
     				'}';
     	}
    +
    +	private void addResource(String name, double value, ResourceAggregateType type) {
    +		extendedResources.put(name, new Resource(name, type, value));
    +	}
    +
    +	public static class Resource {
    +		private String name;
    +		private ResourceAggregateType type;
    +		private Double value;
    --- End diff --
    
    fields should be `final`


---

[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154964117
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---
    @@ -61,18 +81,17 @@
     	/** How many state size in mb are used */
     	private final int stateSizeInMB;
     
    +	private final Map<String, Resource> extendedResources = new HashMap<>(1);
    +
     	/**
     	 * Creates a new ResourceSpec with basic common resources.
     	 *
     	 * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
     	 * @param heapMemoryInMB The size of the java heap memory, in megabytes.
    +	 * @param extendedResources The extended resources, associated with the resource manager used
     	 */
    -	public ResourceSpec(double cpuCores, int heapMemoryInMB) {
    -		this.cpuCores = cpuCores;
    -		this.heapMemoryInMB = heapMemoryInMB;
    -		this.directMemoryInMB = 0;
    -		this.nativeMemoryInMB = 0;
    -		this.stateSizeInMB = 0;
    +	public ResourceSpec(double cpuCores, int heapMemoryInMB, Resource... extendedResources) {
    +		this(cpuCores, heapMemoryInMB, 0, 0, 0, extendedResources);
    --- End diff --
    
    Let's remove this constructor.


---