You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 17:48:19 UTC

[2/2] flink git commit: [FLINK-7878] [api] make resource type extendible in ResourceSpec

[FLINK-7878] [api] make resource type extendible in ResourceSpec

Summary:
Now, flink only support user define CPU and MEM,
but some user need to specify the GPU, FPGA and so on resources.
So it need to make the resouce type extendible in the ResourceSpec.
Add a extend field for new resources.

Test Plan: UnitTest

Reviewers: haitao.w

Differential Revision: https://aone.alibaba-inc.com/code/D327427

make Resource abstract and add GPUResource FPGAResource

This closes #4911.

Add a resource spec builder and remove FPGAResource


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b9ac950
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b9ac950
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b9ac950

Branch: refs/heads/master
Commit: 5b9ac9508b5d16f85b76a6de940458d385e23f0d
Parents: 917fbcb
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Wed Oct 25 14:56:35 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 14 15:31:40 2017 +0100

----------------------------------------------------------------------
 .../api/common/operators/ResourceSpec.java      | 228 +++++++++++++++++--
 .../api/common/operators/ResourceSpecTest.java  | 186 +++++++++++++++
 .../flink/api/java/operator/OperatorTest.java   |   4 +-
 .../plantranslate/JobGraphGeneratorTest.java    |  26 +--
 .../flink/streaming/api/DataStreamTest.java     |  28 +--
 .../graph/StreamingJobGraphGeneratorTest.java   |  20 +-
 6 files changed, 432 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b9ac950/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index cd3e5ad..f87d997 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -19,9 +19,15 @@
 package org.apache.flink.api.common.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Describe the different resource factors of the operator with UDF.
@@ -37,6 +43,7 @@ import java.io.Serializable;
  *     <li>Direct Memory Size</li>
  *     <li>Native Memory Size</li>
  *     <li>State Size</li>
+ *     <li>Extended resources</li>
  * </ol>
  */
 @Internal
@@ -46,6 +53,8 @@ public class ResourceSpec implements Serializable {
 
 	public static final ResourceSpec DEFAULT = new ResourceSpec(0, 0, 0, 0, 0);
 
+	private static String GPU_NAME = "GPU";
+
 	/** How many cpu cores are needed, use double so we can specify cpu like 0.1 */
 	private final double cpuCores;
 
@@ -61,19 +70,7 @@ public class ResourceSpec implements Serializable {
 	/** How many state size in mb are used */
 	private final int stateSizeInMB;
 
-	/**
-	 * Creates a new ResourceSpec with basic common resources.
-	 *
-	 * @param cpuCores The number of CPU cores (possibly fractional, i.e., 0.2 cores)
-	 * @param heapMemoryInMB The size of the java heap memory, in megabytes.
-	 */
-	public ResourceSpec(double cpuCores, int heapMemoryInMB) {
-		this.cpuCores = cpuCores;
-		this.heapMemoryInMB = heapMemoryInMB;
-		this.directMemoryInMB = 0;
-		this.nativeMemoryInMB = 0;
-		this.stateSizeInMB = 0;
-	}
+	private final Map<String, Resource> extendedResources = new HashMap<>(1);
 
 	/**
 	 * Creates a new ResourceSpec with full resources.
@@ -83,18 +80,25 @@ public class ResourceSpec implements Serializable {
 	 * @param directMemoryInMB The size of the java nio direct memory, in megabytes.
 	 * @param nativeMemoryInMB The size of the native memory, in megabytes.
 	 * @param stateSizeInMB The state size for storing in checkpoint.
+	 * @param extendedResources The extended resources, associated with the resource manager used
 	 */
-	public ResourceSpec(
+	protected ResourceSpec(
 			double cpuCores,
 			int heapMemoryInMB,
 			int directMemoryInMB,
 			int nativeMemoryInMB,
-			int stateSizeInMB) {
+			int stateSizeInMB,
+			Resource... extendedResources) {
 		this.cpuCores = cpuCores;
 		this.heapMemoryInMB = heapMemoryInMB;
 		this.directMemoryInMB = directMemoryInMB;
 		this.nativeMemoryInMB = nativeMemoryInMB;
 		this.stateSizeInMB = stateSizeInMB;
+		for (Resource resource : extendedResources) {
+			if (resource != null) {
+				this.extendedResources.put(resource.name, resource);
+			}
+		}
 	}
 
 	/**
@@ -105,12 +109,17 @@ public class ResourceSpec implements Serializable {
 	 * @return The new resource with merged values.
 	 */
 	public ResourceSpec merge(ResourceSpec other) {
-		return new ResourceSpec(
+		ResourceSpec target = new ResourceSpec(
 				Math.max(this.cpuCores, other.cpuCores),
 				this.heapMemoryInMB + other.heapMemoryInMB,
 				this.directMemoryInMB + other.directMemoryInMB,
 				this.nativeMemoryInMB + other.nativeMemoryInMB,
 				this.stateSizeInMB + other.stateSizeInMB);
+		target.extendedResources.putAll(extendedResources);
+		for (Resource resource : other.extendedResources.values()) {
+			target.extendedResources.merge(resource.name, resource, (v1, v2) -> v1.merge(v2));
+		}
+		return target;
 	}
 
 	public double getCpuCores() {
@@ -133,14 +142,31 @@ public class ResourceSpec implements Serializable {
 		return this.stateSizeInMB;
 	}
 
+	public double getGPUResource() {
+		Resource gpuResource = extendedResources.get(GPU_NAME);
+		if (gpuResource != null) {
+			return gpuResource.value;
+		}
+		return 0.0;
+	}
+
 	/**
 	 * Check whether all the field values are valid.
 	 *
 	 * @return True if all the values are equal or greater than 0, otherwise false.
 	 */
 	public boolean isValid() {
-		return (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
-				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0);
+		if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
+				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
+			for (Resource resource : extendedResources.values()) {
+				if (resource.value < 0) {
+					return false;
+				}
+			}
+			return true;
+		} else {
+			return false;
+		}
 	}
 
 	/**
@@ -156,7 +182,17 @@ public class ResourceSpec implements Serializable {
 		int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
 		int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
 		int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
-		return (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0);
+		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
+			for (Resource resource : extendedResources.values()) {
+				if (!other.extendedResources.containsKey(resource.name) ||
+						!other.extendedResources.get(resource.name).type.equals(resource.type) ||
+						other.extendedResources.get(resource.name).value < resource.value) {
+					return false;
+				}
+			}
+			return true;
+		}
+		return false;
 	}
 
 	@Override
@@ -169,7 +205,8 @@ public class ResourceSpec implements Serializable {
 					this.heapMemoryInMB == that.heapMemoryInMB &&
 					this.directMemoryInMB == that.directMemoryInMB &&
 					this.nativeMemoryInMB == that.nativeMemoryInMB &&
-					this.stateSizeInMB == that.stateSizeInMB;
+					this.stateSizeInMB == that.stateSizeInMB &&
+					Objects.equals(this.extendedResources, that.extendedResources);
 		} else {
 			return false;
 		}
@@ -183,17 +220,166 @@ public class ResourceSpec implements Serializable {
 		result = 31 * result + directMemoryInMB;
 		result = 31 * result + nativeMemoryInMB;
 		result = 31 * result + stateSizeInMB;
+		result = 31 * result + extendedResources.hashCode();
 		return result;
 	}
 
 	@Override
 	public String toString() {
+		String extend = "";
+		for (Resource resource : extendedResources.values()) {
+			extend += ", " + resource.name + "=" + resource.value;
+		}
 		return "ResourceSpec{" +
 				"cpuCores=" + cpuCores +
 				", heapMemoryInMB=" + heapMemoryInMB +
 				", directMemoryInMB=" + directMemoryInMB +
 				", nativeMemoryInMB=" + nativeMemoryInMB +
-				", stateSizeInMB=" + stateSizeInMB +
+				", stateSizeInMB=" + stateSizeInMB + extend +
 				'}';
 	}
+
+	public static Builder newBuilder() { return new Builder(); }
+
+	public static class Builder {
+
+		public double cpuCores;
+		public int heapMemoryInMB;
+		private int directMemoryInMB;
+		private int nativeMemoryInMB;
+		private int stateSizeInMB;
+		private GPUResource gpuResource;
+
+		public Builder setCpuCores(double cpuCores) {
+			this.cpuCores = cpuCores;
+			return this;
+		}
+
+		public Builder setHeapMemoryInMB(int heapMemory) {
+			this.heapMemoryInMB = heapMemory;
+			return this;
+		}
+
+		public Builder setDirectMemoryInMB(int directMemory) {
+			this.directMemoryInMB = directMemory;
+			return this;
+		}
+
+		public Builder setNativeMemoryInMB(int nativeMemory) {
+			this.nativeMemoryInMB = nativeMemory;
+			return this;
+		}
+
+		public Builder setStateSizeInMB(int stateSize) {
+			this.stateSizeInMB = stateSize;
+			return this;
+		}
+
+		public Builder setGPUResource(GPUResource gpuResource) {
+			this.gpuResource = gpuResource;
+			return this;
+		}
+
+		public ResourceSpec build() {
+			return new ResourceSpec(cpuCores, heapMemoryInMB, directMemoryInMB, nativeMemoryInMB, stateSizeInMB, gpuResource);
+		}
+	}
+
+	public static abstract class Resource implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public enum ResourceAggregateType {
+			/**
+			 * Denotes keeping the sum of the values with same name when merging two resource specs for operator chaining
+			 */
+			AGGREGATE_TYPE_SUM,
+
+			/**
+			 * Denotes keeping the max of the values with same name when merging two resource specs for operator chaining
+			 */
+			AGGREGATE_TYPE_MAX
+		}
+
+		private final String name;
+
+		private final double value;
+
+		final private ResourceAggregateType type;
+
+		public Resource(String name, double value, ResourceAggregateType type) {
+			this.name = checkNotNull(name);
+			this.value = value;
+			this.type = checkNotNull(type);
+		}
+
+		Resource merge(Resource other) {
+			Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type");
+			Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name");
+			Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type");
+
+			Double value = null;
+			switch (type) {
+				case AGGREGATE_TYPE_MAX :
+					value = Math.max(other.value, this.value);
+					break;
+
+				case AGGREGATE_TYPE_SUM:
+				default:
+					value = this.value + other.value;
+			}
+
+			Resource resource = create(value, type);
+			return resource;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			} else if (o != null && getClass() == o.getClass()) {
+				Resource other = (Resource) o;
+
+				return name.equals(other.name) && type.equals(other.type) && value == other.value;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int hashCode() {
+			int result = name != null ? name.hashCode() : 0;
+			result = 31 * result + type.ordinal();
+			result = 31 * result + (int)value;
+			return result;
+		}
+
+		/**
+		 * Create a resource of the same resource type
+		 *
+		 * @param value The value of the resource
+		 * @param type The aggregate type of the resource
+		 * @return A new instance of the sub resource
+		 */
+		protected abstract Resource create(double value, ResourceAggregateType type);
+	}
+
+	/**
+	 * The GPU resource.
+	 */
+	public static class GPUResource extends Resource {
+
+		public GPUResource(double value) {
+			this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
+		}
+
+		public GPUResource(double value, ResourceAggregateType type) {
+			super(GPU_NAME, value, type);
+		}
+
+		@Override
+		public Resource create(double value, ResourceAggregateType type) {
+			return new GPUResource(value, type);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b9ac950/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
new file mode 100644
index 0000000..5dfe5d0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/ResourceSpecTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for ResourceSpec class, including its all public api: isValid, lessThanOrEqual, equals, hashCode and merge.
+ */
+public class ResourceSpecTest extends TestLogger {
+
+	@Test
+	public void testIsValid() throws Exception {
+		ResourceSpec rs = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		assertTrue(rs.isValid());
+
+		rs = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1)).
+				build();
+		assertTrue(rs.isValid());
+
+		rs = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(-1)).
+				build();
+		assertFalse(rs.isValid());
+	}
+
+	@Test
+	public void testLessThanOrEqual() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		assertTrue(rs1.lessThanOrEqual(rs2));
+		assertTrue(rs2.lessThanOrEqual(rs1));
+
+		ResourceSpec rs3 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1.1)).
+				build();
+		assertTrue(rs1.lessThanOrEqual(rs3));
+		assertFalse(rs3.lessThanOrEqual(rs1));
+
+		ResourceSpec rs4 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(2.2)).
+				build();
+		assertFalse(rs4.lessThanOrEqual(rs3));
+		assertTrue(rs3.lessThanOrEqual(rs4));
+	}
+
+	@Test
+	public void testEquals() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		assertTrue(rs1.equals(rs2));
+		assertTrue(rs2.equals(rs1));
+
+		ResourceSpec rs3 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(2.2)).
+				build();
+		ResourceSpec rs4 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1)).
+				build();
+		assertFalse(rs3.equals(rs4));
+
+		ResourceSpec rs5 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(2.2)).
+				build();
+		assertTrue(rs3.equals(rs5));
+	}
+
+	@Test
+	public void testHashCode() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		assertEquals(rs1.hashCode(), rs2.hashCode());
+
+		ResourceSpec rs3 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(2.2)).
+				build();
+		ResourceSpec rs4 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1)).
+				build();
+		assertFalse(rs3.hashCode() == rs4.hashCode());
+
+		ResourceSpec rs5 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(2.2)).
+				build();
+		assertEquals(rs3.hashCode(), rs5.hashCode());
+
+		ResourceSpec rs6 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(2.2, ResourceSpec.Resource.ResourceAggregateType.AGGREGATE_TYPE_MAX)).
+				build();
+		assertFalse(rs6.hashCode() == rs5.hashCode());
+	}
+
+	@Test
+	public void testMerge() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1.1)).
+				build();
+		ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+
+		ResourceSpec rs3 = rs1.merge(rs2);
+		assertEquals(1.1, rs3.getGPUResource(), 0.000001);
+
+		ResourceSpec rs4 = rs1.merge(rs3);
+		assertEquals(2.2, rs4.getGPUResource(), 0.000001);
+
+		ResourceSpec rs5 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1.1, ResourceSpec.Resource.ResourceAggregateType.AGGREGATE_TYPE_MAX)).
+				build();
+		try {
+			rs4.merge(rs5);
+			fail("Merge with different aggregate type should fail");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		ResourceSpec rs6 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1.5, ResourceSpec.Resource.ResourceAggregateType.AGGREGATE_TYPE_MAX)).
+				build();
+		ResourceSpec rs7 = rs5.merge(rs6);
+		assertEquals(1.5, rs7.getGPUResource(), 0.000001);
+
+	}
+
+	@Test
+	public void testSerializable() throws Exception {
+		ResourceSpec rs1 = ResourceSpec.newBuilder().
+				setCpuCores(1.0).
+				setHeapMemoryInMB(100).
+				setGPUResource(new ResourceSpec.GPUResource(1.1)).
+				build();
+		byte[] buffer = InstantiationUtil.serializeObject(rs1);
+		ResourceSpec rs2 = InstantiationUtil.deserializeObject(buffer, ClassLoader.getSystemClassLoader());
+		assertEquals(rs1, rs2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b9ac950/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
index a99b769..d231a8c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
@@ -60,8 +60,8 @@ public class OperatorTest {
 		opMethod.setAccessible(true);
 
 		// verify explicit change in resources
-		ResourceSpec minResources = new ResourceSpec(1.0, 100);
-		ResourceSpec preferredResources = new ResourceSpec(2.0, 200);
+		ResourceSpec minResources = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec preferredResources = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(200).build();
 		opMethod.invoke(operator, minResources, preferredResources);
 
 		assertEquals(minResources, operator.getMinResources());

http://git-wip-us.apache.org/repos/asf/flink/blob/5b9ac950/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
index d8ee80b..51c6a85 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/JobGraphGeneratorTest.java
@@ -50,13 +50,13 @@ public class JobGraphGeneratorTest {
 	 */
 	@Test
 	public void testResourcesForChainedOperators() throws Exception {
-		ResourceSpec resource1 = new ResourceSpec(0.1, 100);
-		ResourceSpec resource2 = new ResourceSpec(0.2, 200);
-		ResourceSpec resource3 = new ResourceSpec(0.3, 300);
-		ResourceSpec resource4 = new ResourceSpec(0.4, 400);
-		ResourceSpec resource5 = new ResourceSpec(0.5, 500);
-		ResourceSpec resource6 = new ResourceSpec(0.6, 600);
-		ResourceSpec resource7 = new ResourceSpec(0.7, 700);
+		ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
+		ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
+		ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
+		ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
+		ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
+		ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setHeapMemoryInMB(600).build();
+		ResourceSpec resource7 = ResourceSpec.newBuilder().setCpuCores(0.7).setHeapMemoryInMB(700).build();
 
 		Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
 		opMethod.setAccessible(true);
@@ -129,12 +129,12 @@ public class JobGraphGeneratorTest {
 	 */
 	@Test
 	public void testResourcesForDeltaIteration() throws Exception{
-		ResourceSpec resource1 = new ResourceSpec(0.1, 100);
-		ResourceSpec resource2 = new ResourceSpec(0.2, 200);
-		ResourceSpec resource3 = new ResourceSpec(0.3, 300);
-		ResourceSpec resource4 = new ResourceSpec(0.4, 400);
-		ResourceSpec resource5 = new ResourceSpec(0.5, 500);
-		ResourceSpec resource6 = new ResourceSpec(0.6, 600);
+		ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
+		ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
+		ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
+		ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
+		ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
+		ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setHeapMemoryInMB(600).build();
 
 		Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
 		opMethod.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/5b9ac950/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index ea0e139..68738ba 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -531,26 +531,26 @@ public class DataStreamTest {
 	public void testResources() throws Exception{
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		ResourceSpec minResource1 = new ResourceSpec(1.0, 100);
-		ResourceSpec preferredResource1 = new ResourceSpec(2.0, 200);
+		ResourceSpec minResource1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
+		ResourceSpec preferredResource1 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(200).build();
 
-		ResourceSpec minResource2 = new ResourceSpec(1.0, 200);
-		ResourceSpec preferredResource2 = new ResourceSpec(2.0, 300);
+		ResourceSpec minResource2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(200).build();
+		ResourceSpec preferredResource2 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(300).build();
 
-		ResourceSpec minResource3 = new ResourceSpec(1.0, 300);
-		ResourceSpec preferredResource3 = new ResourceSpec(2.0, 400);
+		ResourceSpec minResource3 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(300).build();
+		ResourceSpec preferredResource3 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(400).build();
 
-		ResourceSpec minResource4 = new ResourceSpec(1.0, 400);
-		ResourceSpec preferredResource4 = new ResourceSpec(2.0, 500);
+		ResourceSpec minResource4 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(400).build();
+		ResourceSpec preferredResource4 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(500).build();
 
-		ResourceSpec minResource5 = new ResourceSpec(1.0, 500);
-		ResourceSpec preferredResource5 = new ResourceSpec(2.0, 600);
+		ResourceSpec minResource5 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(500).build();
+		ResourceSpec preferredResource5 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(600).build();
 
-		ResourceSpec minResource6 = new ResourceSpec(1.0, 600);
-		ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700);
+		ResourceSpec minResource6 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(600).build();
+		ResourceSpec preferredResource6 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(700).build();
 
-		ResourceSpec minResource7 = new ResourceSpec(1.0, 700);
-		ResourceSpec preferredResource7 = new ResourceSpec(2.0, 800);
+		ResourceSpec minResource7 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(700).build();
+		ResourceSpec preferredResource7 = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(800).build();
 
 		Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
 		opMethod.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/5b9ac950/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4e94e05..eac5bc1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -165,11 +165,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	 */
 	@Test
 	public void testResourcesForChainedSourceSink() throws Exception {
-		ResourceSpec resource1 = new ResourceSpec(0.1, 100);
-		ResourceSpec resource2 = new ResourceSpec(0.2, 200);
-		ResourceSpec resource3 = new ResourceSpec(0.3, 300);
-		ResourceSpec resource4 = new ResourceSpec(0.4, 400);
-		ResourceSpec resource5 = new ResourceSpec(0.5, 500);
+		ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
+		ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
+		ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
+		ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
+		ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
 
 		Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
 		opMethod.setAccessible(true);
@@ -237,11 +237,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 	 */
 	@Test
 	public void testResourcesForIteration() throws Exception {
-		ResourceSpec resource1 = new ResourceSpec(0.1, 100);
-		ResourceSpec resource2 = new ResourceSpec(0.2, 200);
-		ResourceSpec resource3 = new ResourceSpec(0.3, 300);
-		ResourceSpec resource4 = new ResourceSpec(0.4, 400);
-		ResourceSpec resource5 = new ResourceSpec(0.5, 500);
+		ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
+		ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
+		ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
+		ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
+		ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
 
 		Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class);
 		opMethod.setAccessible(true);