You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:50 UTC

[07/11] flink git commit: [FLINK-5133] [core] Followups for ResourceSpec on DataSet / DataStream API

[FLINK-5133] [core] Followups for ResourceSpec on DataSet / DataStream API

  - Correct some use of Preconditions.checkNotNull
  - Make 'resources' plural in all cases
  - Add comments to why the setters are commented out
  - Add @PublicEvolving annotations
  - Make the Scala API completeness test match Scala-esk versions of Java Getters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9912de21
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9912de21
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9912de21

Branch: refs/heads/master
Commit: 9912de21a1053013a220707f8b3868bdbf93aaca
Parents: f37ed02
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 28 10:42:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/Operator.java    |  41 ++++---
 .../api/common/operators/ResourceSpec.java      |  29 ++---
 .../flink/api/java/operators/DataSink.java      | 109 ++++++++++---------
 .../api/java/operators/DeltaIteration.java      | 105 +++++++++---------
 .../flink/api/java/operators/Operator.java      | 101 +++++++++--------
 .../api/java/operators/OperatorTranslation.java |  20 ++--
 .../apache/flink/optimizer/plan/PlanNode.java   |   8 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  69 ++++++------
 .../streaming/api/datastream/DataStream.java    |  18 +--
 .../api/datastream/DataStreamSink.java          |  76 +++++++------
 .../datastream/SingleOutputStreamOperator.java  |  74 +++++++------
 .../flink/streaming/api/graph/StreamGraph.java  |   2 +-
 .../api/graph/StreamGraphGenerator.java         |   4 +-
 .../flink/streaming/api/graph/StreamNode.java   |  18 +--
 .../transformations/StreamTransformation.java   |  30 ++---
 .../flink/streaming/api/scala/DataStream.scala  |  58 +++++-----
 .../ScalaAPICompletenessTestBase.scala          |  36 +++---
 17 files changed, 424 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index a9dedfa..1905555 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -16,18 +16,20 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import java.util.List;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Visitable;
 
+import javax.annotation.Nullable;
+
 /**
 * Abstract base class for all operators. An operator is a source, sink, or it applies an operation to
 * one or more inputs, producing a result.
@@ -45,9 +47,11 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 		
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;  // the number of parallel instances to use
 
-	private ResourceSpec minResource;			// the minimum resource of the contract instance.
+	@Nullable
+	private ResourceSpec minResources;          // the minimum resource of the contract instance.
 
-	private ResourceSpec preferredResource;	// the preferred resource of the contract instance.
+	@Nullable
+	private ResourceSpec preferredResources;    // the preferred resource of the contract instance.
 
 	/**
 	 * The return type of the user function.
@@ -190,35 +194,40 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 	}
 
 	/**
-	 * Gets the minimum resource for this contract instance. The minimum resource denotes how many
-	 * resources will be needed in the minimum for the user function during the execution.
+	 * Gets the minimum resources for this operator. The minimum resources denotes how many
+	 * resources will be needed at least minimum for the operator or user function during the execution.
 	 *
-	 * @return The minimum resource of this operator.
+	 * @return The minimum resources of this operator.
 	 */
-	public ResourceSpec getMinResource() {
-		return this.minResource;
+	@Nullable
+	@PublicEvolving
+	public ResourceSpec getMinResources() {
+		return this.minResources;
 	}
 
 	/**
-	 * Gets the preferred resource for this contract instance. The preferred resource denotes how many
+	 * Gets the preferred resources for this contract instance. The preferred resources denote how many
 	 * resources will be needed in the maximum for the user function during the execution.
 	 *
 	 * @return The preferred resource of this operator.
 	 */
-	public ResourceSpec getPreferredResource() {
-		return this.preferredResource;
+	@Nullable
+	@PublicEvolving
+	public ResourceSpec getPreferredResources() {
+		return this.preferredResources;
 	}
 
 	/**
 	 * Sets the minimum and preferred resources for this contract instance. The resource denotes
 	 * how many memories and cpu cores of the user function will be consumed during the execution.
 	 *
-	 * @param minResource The minimum resource of this operator.
-	 * @param preferredResource The preferred resource of this operator.
+	 * @param minResources The minimum resource of this operator.
+	 * @param preferredResources The preferred resource of this operator.
 	 */
-	public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		this.minResource = minResource;
-		this.preferredResource = preferredResource;
+	@PublicEvolving
+	public void setResource(ResourceSpec minResources, ResourceSpec preferredResources) {
+		this.minResources = minResources;
+		this.preferredResources = preferredResources;
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 1387508..0ea289a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -105,14 +105,12 @@ public class ResourceSpec implements Serializable {
 	 * @return The new resource with merged values.
 	 */
 	public ResourceSpec merge(ResourceSpec other) {
-		ResourceSpec result = new ResourceSpec(
+		return new ResourceSpec(
 				Math.max(this.cpuCores, other.cpuCores),
 				this.heapMemoryInMB + other.heapMemoryInMB,
 				this.directMemoryInMB + other.directMemoryInMB,
 				this.nativeMemoryInMB + other.nativeMemoryInMB,
 				this.stateSizeInMB + other.stateSizeInMB);
-
-		return  result;
 	}
 
 	public double getCpuCores() {
@@ -141,12 +139,8 @@ public class ResourceSpec implements Serializable {
 	 * @return True if all the values are equal or greater than 0, otherwise false.
 	 */
 	public boolean isValid() {
-		if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
-				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
-			return true;
-		} else {
-			return false;
-		}
+		return (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
+				this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0);
 	}
 
 	/**
@@ -162,11 +156,7 @@ public class ResourceSpec implements Serializable {
 		int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
 		int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
 		int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
-		if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
-			return true;
-		} else {
-			return false;
-		}
+		return (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0);
 	}
 
 	@Override
@@ -186,6 +176,17 @@ public class ResourceSpec implements Serializable {
 	}
 
 	@Override
+	public int hashCode() {
+		final long cpuBits =  Double.doubleToLongBits(cpuCores);
+		int result = (int) (cpuBits ^ (cpuBits >>> 32));
+		result = 31 * result + heapMemoryInMB;
+		result = 31 * result + directMemoryInMB;
+		result = 31 * result + nativeMemoryInMB;
+		result = 31 * result + stateSizeInMB;
+		return result;
+	}
+
+	@Override
 	public String toString() {
 		return "ResourceSpec{" +
 				"cpuCores=" + cpuCores +

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 3be9cc0..369e013 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -52,9 +52,9 @@ public class DataSink<T> {
 	
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-	private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+	private ResourceSpec minResources = ResourceSpec.UNKNOWN;
 
-	private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+	private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 
 	private Configuration parameters;
 
@@ -285,61 +285,70 @@ public class DataSink<T> {
 	}
 
 	/**
-	 * Returns the minimum resource of this data sink. If no minimum resource has been set,
-	 * it returns the default empty resource.
+	 * Returns the minimum resources of this data sink. If no minimum resources have been set,
+	 * this returns the default resource profile.
 	 *
-	 * @return The minimum resource of this data sink.
+	 * @return The minimum resources of this data sink.
 	 */
-	public ResourceSpec getMinResource() {
-		return this.minResource;
+	@PublicEvolving
+	public ResourceSpec getMinResources() {
+		return this.minResources;
 	}
 
 	/**
-	 * Returns the preferred resource of this data sink. If no preferred resource has been set,
-	 * it returns the default empty resource.
+	 * Returns the preferred resources of this data sink. If no preferred resources have been set,
+	 * this returns the default resource profile.
 	 *
-	 * @return The preferred resource of this data sink.
+	 * @return The preferred resources of this data sink.
 	 */
-	public ResourceSpec getPreferredResource() {
-		return this.preferredResource;
+	@PublicEvolving
+	public ResourceSpec getPreferredResources() {
+		return this.preferredResources;
 	}
 
-	/**
-	 * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
-	 *	The minimum resource must be satisfied and the preferred resource specifies the upper bound
-	 * for dynamic resource resize.
-	 *
-	 * @param minResource The minimum resource for this data sink.
-	 * @param preferredResource The preferred resource for this data sink.
-	 * @return The data sink with set minimum and preferred resources.
-	 */
-	/*
-	public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		Preconditions.checkNotNull(minResource != null && preferredResource != null,
-				"The min and preferred resources must be not null.");
-		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
-		this.minResource = minResource;
-		this.preferredResource = preferredResource;
-
-		return this;
-	}*/
-
-	/**
-	 * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources.
-	 *
-	 * @param resource The resource for this data sink.
-	 * @return The data sink with set minimum and preferred resources.
-	 */
-	/*
-	public DataSink<T> setResource(ResourceSpec resource) {
-		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
-		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
-		this.minResource = resource;
-		this.preferredResource = resource;
-
-		return this;
-	}*/
+//	---------------------------------------------------------------------------
+//	 Fine-grained resource profiles are an incomplete work-in-progress feature
+//	 The setters are hence commented out at this point.
+//	---------------------------------------------------------------------------
+//
+//	/**
+//	 * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
+//	 *	The minimum resource must be satisfied and the preferred resource specifies the upper bound
+//	 * for dynamic resource resize.
+//	 *
+//	 * @param minResources The minimum resource for this data sink.
+//	 * @param preferredResources The preferred resource for this data sink.
+//	 * @return The data sink with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public DataSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+//		Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+//		Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+//		Preconditions.checkArgument(minResources.isValid() && 
+//				preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+//				"The values in resource must be not less than 0 and the preferred " +
+//				"resource must be greater than the min resource.");
+//
+//		this.minResources = minResources;
+//		this.preferredResources = preferredResources;
+//
+//		return this;
+//	}
+//
+//	/**
+//	 * Sets the resources for this data sink. This overrides the default resource profile.
+//	 *
+//	 * @param resources The resources for this data sink.
+//	 * @return The data sink with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public DataSink<T> setResources(ResourceSpec resources) {
+//		Preconditions.checkNotNull(resources, "The resources must be not null.");
+//		Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+//		this.minResources = resources;
+//		this.preferredResources = resources;
+//
+//		return this;
+//	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index cf0a63e..3d327e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -64,9 +64,9 @@ public class DeltaIteration<ST, WT> {
 	
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-	private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+	private ResourceSpec minResources = ResourceSpec.UNKNOWN;
 
-	private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+	private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 	
 	private boolean solutionSetUnManaged;
 
@@ -197,65 +197,72 @@ public class DeltaIteration<ST, WT> {
 		return parallelism;
 	}
 
-	/**
-	 * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
-	 *	The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
-	 *
-	 * @param minResource The minimum resource for the iteration.
-	 * @param preferredResource The preferred resource for the iteration.
-	 * @return The iteration with set minimum and preferred resources.
-	 */
-	/*
-	public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		Preconditions.checkNotNull(minResource != null && preferredResource != null,
-				"The min and preferred resources must be not null.");
-		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
-		this.minResource = minResource;
-		this.preferredResource = preferredResource;
-
-		return this;
-	}*/
+//	---------------------------------------------------------------------------
+//	 Fine-grained resource profiles are an incomplete work-in-progress feature
+//	 The setters are hence commented out at this point.
+//	---------------------------------------------------------------------------
+//
+//	/**
+//	 * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
+//	 * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+//	 *
+//	 * @param minResources The minimum resource for the iteration.
+//	 * @param preferredResources The preferred resource for the iteration.
+//	 * @return The iteration with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public DeltaIteration<ST, WT> setResource(ResourceSpec minResources, ResourceSpec preferredResources) {
+//		Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+//		Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+//		Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+//				"The values in resources must be not less than 0 and the preferred resources must be greater than the min resources.");
+//
+//		this.minResources = minResources;
+//		this.preferredResources = preferredResources;
+//
+//		return this;
+//	}
+//
+//	/**
+//	 * Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
+//	 *	The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+//	 *
+//	 * @param resources The resource for the iteration.
+//	 * @return The iteration with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public DeltaIteration<ST, WT> setResource(ResourceSpec resources) {
+//		Preconditions.checkNotNull(resources, "The resources must be not null.");
+//		Preconditions.checkArgument(resources.isValid(), "The values in resource must be not less than 0.");
+//
+//		this.minResources = resources;
+//		this.preferredResources = resources;
+//
+//		return this;
+//	}
 
 	/**
-	 * Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
-	 *	The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
-	 *
-	 * @param resource The resource for the iteration.
-	 * @return The iteration with set minimum and preferred resources.
-	 */
-	/*
-	public DeltaIteration<ST, WT> setResource(ResourceSpec resource) {
-		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
-		Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0.");
-
-		this.minResource = resource;
-		this.preferredResource = resource;
-
-		return this;
-	}*/
-
-	/**
-	 * Gets the minimum resource from this iteration. If no minimum resource has been set,
+	 * Gets the minimum resources from this iteration. If no minimum resources have been set,
 	 * it returns the default empty resource.
 	 *
-	 * @return The minimum resource of the iteration.
+	 * @return The minimum resources of the iteration.
 	 */
-	public ResourceSpec getMinResource() {
-		return this.minResource;
+	@PublicEvolving
+	public ResourceSpec getMinResources() {
+		return this.minResources;
 	}
 
 	/**
-	 * Gets the preferred resource from this iteration. If no preferred resource has been set,
+	 * Gets the preferred resources from this iteration. If no preferred resources have been set,
 	 * it returns the default empty resource.
 	 *
-	 * @return The preferred resource of the iteration.
+	 * @return The preferred resources of the iteration.
 	 */
-	public ResourceSpec getPreferredResource() {
-		return this.preferredResource;
+	@PublicEvolving
+	public ResourceSpec getPreferredResources() {
+		return this.preferredResources;
 	}
-	
+
 	/**
 	 * Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the
 	 * iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step,

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 79cae14..6ae59dd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -39,9 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	
 	protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-	protected ResourceSpec minResource = ResourceSpec.UNKNOWN;
+	protected ResourceSpec minResources = ResourceSpec.UNKNOWN;
+
+	protected ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 
-	protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
 
 	protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
 		super(context, resultType);
@@ -81,8 +82,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	 *
 	 * @return The minimum resource of this operator.
 	 */
-	public ResourceSpec minResource() {
-		return this.minResource;
+	public ResourceSpec getMinResources() {
+		return this.minResources;
 	}
 
 	/**
@@ -91,8 +92,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	 *
 	 * @return The preferred resource of this operator.
 	 */
-	public ResourceSpec preferredResource() {
-		return this.preferredResource;
+	public ResourceSpec getPreferredResources() {
+		return this.preferredResources;
 	}
 
 	/**
@@ -129,45 +130,51 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 		return returnType;
 	}
 
-	/**
-	 * Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
-	 * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
-	 *
-	 * @param minResource The minimum resource for this operator.
-	 * @param preferredResource The preferred resource for this operator.
-	 * @return The operator with set minimum and preferred resources.
-	 */
-	/*
-	public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		Preconditions.checkNotNull(minResource != null && preferredResource != null,
-				"The min and preferred resources must be not null.");
-		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
-		this.minResource = minResource;
-		this.preferredResource = preferredResource;
-
-		@SuppressWarnings("unchecked")
-		O returnType = (O) this;
-		return returnType;
-	}*/
-
-	/**
-	 * Sets the resource for this operator. This overrides the default empty minimum and preferred resources.
-	 *
-	 * @param resource The resource for this operator.
-	 * @return The operator with set minimum and preferred resources.
-	 */
-	/*
-	public O setResource(ResourceSpec resource) {
-		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
-		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
-		this.minResource = resource;
-		this.preferredResource = resource;
-
-		@SuppressWarnings("unchecked")
-		O returnType = (O) this;
-		return returnType;
-	}*/
+//	---------------------------------------------------------------------------
+//	 Fine-grained resource profiles are an incomplete work-in-progress feature
+//	 The setters are hence commented out at this point.
+//	---------------------------------------------------------------------------
+//
+//	/**
+//	 * Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
+//	 * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+//	 *
+//	 * @param minResources The minimum resource for this operator.
+//	 * @param preferredResources The preferred resource for this operator.
+//	 * @return The operator with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public O setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+//		Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+//		Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+//
+//		Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+//				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+//
+//		this.minResources = minResources;
+//		this.preferredResources = preferredResources;
+//
+//		@SuppressWarnings("unchecked")
+//		O returnType = (O) this;
+//		return returnType;
+//	}
+//
+//	/**
+//	 * Sets the resources for this operator. This overrides the default minimum and preferred resources.
+//	 *
+//	 * @param resources The resource for this operator.
+//	 * @return The operator with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public O setResources(ResourceSpec resources) {
+//		Preconditions.checkNotNull(resources, "The resource must be not null.");
+//		Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+//		this.minResources = resources;
+//		this.preferredResources = resources;
+//
+//		@SuppressWarnings("unchecked")
+//		O returnType = (O) this;
+//		return returnType;
+//	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 909cd32..3bffd8b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -64,7 +64,7 @@ public class OperatorTranslation {
 		// translate the sink itself and connect it to the input
 		GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
 
-		translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource());
+		translatedSink.setResource(sink.getMinResources(), sink.getPreferredResources());
 
 		return translatedSink;
 	}
@@ -95,29 +95,29 @@ public class OperatorTranslation {
 		if (dataSet instanceof DataSource) {
 			DataSource<T> dataSource = (DataSource<T>) dataSet;
 			dataFlowOp = dataSource.translateToDataFlow();
-			dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource());
+			dataFlowOp.setResource(dataSource.getMinResources(), dataSource.getPreferredResources());
 		}
 		else if (dataSet instanceof SingleInputOperator) {
 			SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
 			dataFlowOp = translateSingleInputOperator(singleInputOperator);
-			dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource());
+			dataFlowOp.setResource(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
 		}
 		else if (dataSet instanceof TwoInputOperator) {
 			TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
 			dataFlowOp = translateTwoInputOperator(twoInputOperator);
-			dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource());
+			dataFlowOp.setResource(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
 		}
 		else if (dataSet instanceof BulkIterationResultSet) {
-			BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
+			BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
 			dataFlowOp = translateBulkIteration(bulkIterationResultSet);
-			dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(),
-					bulkIterationResultSet.getIterationHead().preferredResource());
+			dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResources(),
+					bulkIterationResultSet.getIterationHead().getPreferredResources());
 		}
 		else if (dataSet instanceof DeltaIterationResultSet) {
-			DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
+			DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
 			dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
-			dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(),
-					deltaIterationResultSet.getIterationHead().getPreferredResource());
+			dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResources(),
+					deltaIterationResultSet.getIterationHead().getPreferredResources());
 		}
 		else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
 			throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 4ef91b3..723c532 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -310,12 +310,12 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 		return this.parallelism;
 	}
 
-	public ResourceSpec getMinResource() {
-		return this.template.getOperator().getMinResource();
+	public ResourceSpec getMinResources() {
+		return this.template.getOperator().getMinResources();
 	}
 
-	public ResourceSpec getPreferredResource() {
-		return this.template.getOperator().getPreferredResource();
+	public ResourceSpec getPreferredResources() {
+		return this.template.getOperator().getPreferredResources();
 	}
 	
 	public long getGuaranteedAvailableMemory() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5cfb601..bfe7567 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -177,48 +177,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
         "parallelism.")
   }
 
-  /**
-   * Sets the minimum and preferred resources of this operation.
-   */
-  /*
-  def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = {
-    javaSet match {
-      case ds: DataSource[_] => ds.setResource(minResource, preferredResource)
-      case op: Operator[_, _] => op.setResource(minResource, preferredResource)
-      case di: DeltaIterationResultSet[_, _] =>
-        di.getIterationHead.setResource(minResource, preferredResource)
-      case _ =>
-        throw new UnsupportedOperationException("Operator does not support " +
-          "configuring custom resources specs.")
-    }
-    this
-  }*/
 
-  /**
-   * Sets the resource of this operation.
-   */
-  /*
-  def resource(resource: ResourceSpec) : Unit = {
-    this.resource(resource, resource)
-  }*/
+// ---------------------------------------------------------------------------
+//  Fine-grained resource profiles are an incomplete work-in-progress feature
+//  The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//  /**
+//   * Sets the minimum and preferred resources of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(minResources: ResourceSpec, preferredResources: ResourceSpec) : Unit = {
+//    javaSet match {
+//      case ds: DataSource[_] => ds.setResources(minResources, preferredResources)
+//      case op: Operator[_, _] => op.setResources(minResources, preferredResources)
+//      case di: DeltaIterationResultSet[_, _] =>
+//        di.getIterationHead.setResources(minResources, preferredResources)
+//      case _ =>
+//        throw new UnsupportedOperationException("Operator does not support " +
+//          "configuring custom resources specs.")
+//    }
+//    this
+//  }
+//
+//  /**
+//   * Sets the resource of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(resources: ResourceSpec) : Unit = {
+//    this.resources(resources, resources)
+//  }
 
   /**
-   * Returns the minimum resource of this operation.
+   * Returns the minimum resources of this operation.
    */
-  def minResource: ResourceSpec = javaSet match {
-    case ds: DataSource[_] => ds.minResource()
-    case op: Operator[_, _] => op.minResource
+  @PublicEvolving
+  def minResources: ResourceSpec = javaSet match {
+    case ds: DataSource[_] => ds.getMinResources()
+    case op: Operator[_, _] => op.getMinResources()
     case _ =>
       throw new UnsupportedOperationException("Operator does not support " +
         "configuring custom resources specs.")
   }
 
   /**
-   * Returns the preferred resource of this operation.
+   * Returns the preferred resources of this operation.
    */
-  def preferredResource: ResourceSpec = javaSet match {
-    case ds: DataSource[_] => ds.preferredResource()
-    case op: Operator[_, _] => op.preferredResource
+  @PublicEvolving
+  def preferredResources: ResourceSpec = javaSet match {
+    case ds: DataSource[_] => ds.getPreferredResources()
+    case op: Operator[_, _] => op.getPreferredResources()
     case _ =>
       throw new UnsupportedOperationException("Operator does not support " +
         "configuring custom resources specs.")

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ae1c39a..c443758 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -144,21 +144,23 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Gets the minimum resource for this operator.
+	 * Gets the minimum resources for this operator.
 	 *
-	 * @return The minimum resource set for this operator.
+	 * @return The minimum resources set for this operator.
 	 */
-	public ResourceSpec minResource() {
-		return transformation.getMinResource();
+	@PublicEvolving
+	public ResourceSpec getMinResources() {
+		return transformation.getMinResources();
 	}
 
 	/**
-	 * Gets the preferred resource for this operator.
+	 * Gets the preferred resources for this operator.
 	 *
-	 * @return The preferred resource set for this operator.
+	 * @return The preferred resources set for this operator.
 	 */
-	public ResourceSpec preferredResource() {
-		return transformation.getPreferredResource();
+	@PublicEvolving
+	public ResourceSpec getPreferredResources() {
+		return transformation.getPreferredResources();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 69e21d6..39d81c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation;
 @Public
 public class DataStreamSink<T> {
 
-	SinkTransformation<T> transformation;
+	private final SinkTransformation<T> transformation;
 
 	@SuppressWarnings("unchecked")
 	protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
@@ -113,41 +113,45 @@ public class DataStreamSink<T> {
 		return this;
 	}
 
-	/**
-	 * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
-	 * be considered in resource resize feature for future plan.
-	 *
-	 * @param minResource The minimum resource for this sink.
-	 * @param preferredResource The preferred resource for this sink
-	 * @return The sink with set minimum and preferred resources.
-	 */
-	/*
-	public DataStreamSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		Preconditions.checkNotNull(minResource != null && preferredResource != null,
-				"The min and preferred resources must be not null.");
-		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
-		transformation.setResource(minResource, preferredResource);
-
-		return this;
-	}*/
-
-	/**
-	 * Sets the resource for this sink, the minimum and preferred resources are the same by default.
-	 *
-	 * @param resource The resource for this sink.
-	 * @return The sink with set minimum and preferred resources.
-	 */
-	/*
-	public DataStreamSink<T> setResource(ResourceSpec resource) {
-		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
-		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
-		transformation.setResource(resource, resource);
-
-		return this;
-	}*/
+//	---------------------------------------------------------------------------
+//	 Fine-grained resource profiles are an incomplete work-in-progress feature
+//	 The setters are hence commented out at this point.
+//	---------------------------------------------------------------------------
+//	/**
+//	 * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
+//	 * be considered in resource resize feature for future plan.
+//	 *
+//	 * @param minResources The minimum resources for this sink.
+//	 * @param preferredResources The preferred resources for this sink
+//	 * @return The sink with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public DataStreamSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+//		Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+//		Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+//		Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+//				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+//
+//		transformation.setResources(minResources, preferredResources);
+//
+//		return this;
+//	}
+//
+//	/**
+//	 * Sets the resource for this sink, the minimum and preferred resources are the same by default.
+//	 *
+//	 * @param resources The resource for this sink.
+//	 * @return The sink with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public DataStreamSink<T> setResources(ResourceSpec resources) {
+//		Preconditions.checkNotNull(resources, "The resource must be not null.");
+//		Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+//		transformation.setResources(resources, resources);
+//
+//		return this;
+//	}
 
 	/**
 	 * Turns off chaining for this operator so thread co-location will not be

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index d856603..859c6d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -154,41 +154,45 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 		return this;
 	}
 
-	/**
-	 * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will
-	 * be considered in dynamic resource resize feature for future plan.
-	 *
-	 * @param minResource The minimum resource for this operator.
-	 * @param preferredResource The preferred resource for this operator.
-	 * @return The operator with set minimum and preferred resources.
-	 */
-	/*
-	public SingleOutputStreamOperator<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		Preconditions.checkArgument(minResource != null && preferredResource != null,
-				"The min and preferred resources must be not null.");
-		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
-		transformation.setResource(minResource, preferredResource);
-
-		return this;
-	}*/
-
-	/**
-	 * Sets the resource for this operator, the minimum and preferred resources are the same by default.
-	 *
-	 * @param resource The resource for this operator.
-	 * @return The operator with set minimum and preferred resources.
-	 */
-	/*
-	public SingleOutputStreamOperator<T> setResource(ResourceSpec resource) {
-		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
-		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
-		transformation.setResource(resource, resource);
-
-		return this;
-	}*/
+//	---------------------------------------------------------------------------
+//	 Fine-grained resource profiles are an incomplete work-in-progress feature
+//	 The setters are hence commented out at this point.
+//	---------------------------------------------------------------------------
+//	/**
+//	 * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will
+//	 * be considered in dynamic resource resize feature for future plan.
+//	 *
+//	 * @param minResources The minimum resources for this operator.
+//	 * @param preferredResources The preferred resources for this operator.
+//	 * @return The operator with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public SingleOutputStreamOperator<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+//		Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+//		Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+//		Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResource.lessThanOrEqual(preferredResources),
+//				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+//
+//		transformation.setResources(minResources, preferredResources);
+//
+//		return this;
+//	}
+//
+//	/**
+//	 * Sets the resources for this operator, the minimum and preferred resources are the same by default.
+//	 *
+//	 * @param resources The resources for this operator.
+//	 * @return The operator with set minimum and preferred resources.
+//	 */
+//	@PublicEvolving
+//	public SingleOutputStreamOperator<T> setResources(ResourceSpec resources) {
+//		Preconditions.checkNotNull(resources, "The resource must be not null.");
+//		Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+//		transformation.setResources(resources, resources);
+//
+//		return this;
+//	}
 
 	private boolean canBeParallel() {
 		return !nonParallel;

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index fcbc607..a87e63d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -416,7 +416,7 @@ public class StreamGraph extends StreamingPlan {
 
 	public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) {
 		if (getStreamNode(vertexID) != null) {
-			getStreamNode(vertexID).setResource(minResource, preferredResource);
+			getStreamNode(vertexID).setResources(minResource, preferredResource);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index af92421..f55ff47 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -202,8 +202,8 @@ public class StreamGraphGenerator {
 			streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
 		}
 
-		if (transform.getMinResource() != null && transform.getPreferredResource() != null) {
-			streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource());
+		if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
+			streamGraph.setResource(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
 		}
 
 		return transformedIds;

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0bf9adf..2d2e1e75 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -49,8 +49,8 @@ public class StreamNode implements Serializable {
 	 * dynamic scaling and the number of key groups used for partitioned state.
 	 */
 	private int maxParallelism;
-	private ResourceSpec minResource;
-	private ResourceSpec preferredResource;
+	private ResourceSpec minResources;
+	private ResourceSpec preferredResources;
 	private Long bufferTimeout = null;
 	private final String operatorName;
 	private String slotSharingGroup;
@@ -168,17 +168,17 @@ public class StreamNode implements Serializable {
 		this.maxParallelism = maxParallelism;
 	}
 
-	public ResourceSpec getMinResource() {
-		return minResource;
+	public ResourceSpec getMinResources() {
+		return minResources;
 	}
 
-	public ResourceSpec getPreferredResource() {
-		return preferredResource;
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
 	}
 
-	public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		this.minResource = minResource;
-		this.preferredResource = preferredResource;
+	public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+		this.minResources = minResources;
+		this.preferredResources = preferredResources;
 	}
 
 	public Long getBufferTimeout() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1d22454..24b5736 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@code StreamTransformation} represents the operation that creates a
  * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
@@ -127,16 +129,16 @@ public abstract class StreamTransformation<T> {
 	private int maxParallelism = -1;
 
 	/**
-	 *  The minimum resource for this stream transformation. It defines the lower limit for
-	 *  dynamic resource resize in future plan.
+	 *  The minimum resources for this stream transformation. It defines the lower limit for
+	 *  dynamic resources resize in future plan.
 	 */
-	private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+	private ResourceSpec minResources = ResourceSpec.UNKNOWN;
 
 	/**
-	 *  The preferred resource for this stream transformation. It defines the upper limit for
+	 *  The preferred resources for this stream transformation. It defines the upper limit for
 	 *  dynamic resource resize in future plan.
 	 */
-	private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+	private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 
 	/**
 	 * User-specified ID for this transformation. This is used to assign the
@@ -229,12 +231,12 @@ public abstract class StreamTransformation<T> {
 	/**
 	 * Sets the minimum and preferred resources for this stream transformation.
 	 *
-	 * @param minResource The minimum resource of this transformation.
-	 * @param preferredResource The preferred resource of this transformation.
+	 * @param minResources The minimum resource of this transformation.
+	 * @param preferredResources The preferred resource of this transformation.
 	 */
-	public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
-		this.minResource = minResource;
-		this.preferredResource = preferredResource;
+	public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+		this.minResources = checkNotNull(minResources);
+		this.preferredResources = checkNotNull(preferredResources);
 	}
 
 	/**
@@ -242,8 +244,8 @@ public abstract class StreamTransformation<T> {
 	 *
 	 * @return The minimum resource of this transformation.
 	 */
-	public ResourceSpec getMinResource() {
-		return minResource;
+	public ResourceSpec getMinResources() {
+		return minResources;
 	}
 
 	/**
@@ -251,8 +253,8 @@ public abstract class StreamTransformation<T> {
 	 *
 	 * @return The preferred resource of this transformation.
 	 */
-	public ResourceSpec getPreferredResource() {
-		return preferredResource;
+	public ResourceSpec getPreferredResources() {
+		return preferredResources;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index e42fb3f..35e1f23 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -146,36 +146,42 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
-   * Returns the minimum resource of this operation.
+   * Returns the minimum resources of this operation.
    */
-  def minResource: ResourceSpec = stream.minResource()
-
-  /**
-   * Returns the preferred resource of this operation.
-   */
-  def preferredResource: ResourceSpec = stream.preferredResource()
-
-  /**
-   * Sets the minimum and preferred resources of this operation.
-   */
-  /*
-  def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] =
-    stream match {
-      case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource(
-        minResource, preferredResource))
-      case _ =>
-        throw new UnsupportedOperationException("Operator does not support " +
-          "configuring custom resources specs.")
-      this
-  }*/
+  @PublicEvolving
+  def minResources: ResourceSpec = stream.getMinResources()
 
   /**
-   * Sets the resource of this operation.
+   * Returns the preferred resources of this operation.
    */
-  /*
-  def resource(resource: ResourceSpec) : Unit = {
-    this.resource(resource, resource)
-  }*/
+  @PublicEvolving
+  def preferredResources: ResourceSpec = stream.getPreferredResources()
+
+// ---------------------------------------------------------------------------
+//  Fine-grained resource profiles are an incomplete work-in-progress feature
+//  The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//  /**
+//   * Sets the minimum and preferred resources of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(minResources: ResourceSpec, preferredResources: ResourceSpec) : DataStream[T] =
+//    stream match {
+//      case stream : SingleOutputStreamOperator[T] => asScalaStream(
+//        stream.setResources(minResources, preferredResources))
+//      case _ =>
+//        throw new UnsupportedOperationException("Operator does not support " +
+//          "configuring custom resources specs.")
+//      this
+//  }
+//
+//  /**
+//   * Sets the resource of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(resources: ResourceSpec) : Unit = {
+//    this.resources(resources, resources)
+//  }
 
   /**
    * Gets the name of the current data stream. This name is

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
index 907ad9f..7abb392 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
@@ -41,21 +41,6 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger {
   protected def isExcludedByName(method: Method): Boolean
 
   /**
-   * Determines whether a method is excluded by an interface it uses.
-   */
-  protected def isExcludedByInterface(method: Method): Boolean = {
-    val excludedInterfaces =
-      Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
-    def toComparisionKey(method: Method) =
-      (method.getReturnType, method.getName, method.getGenericReturnType)
-    val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
-      excludedInterfaces.contains(i.getName)
-    }
-    val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
-    excludedMethods.contains(toComparisionKey(method))
-  }
-
-  /**
    * Utility to be called during the test.
    */
   protected def checkMethods(
@@ -66,26 +51,33 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger {
     val javaMethods = javaClass.getMethods
       .filterNot(_.isAccessible)
       .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
       .map(m => m.getName).toSet
 
     val scalaMethods = scalaClass.getMethods
       .filterNot(_.isAccessible)
       .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
       .map(m => m.getName).toSet
 
     val missingMethods = javaMethods -- scalaMethods
 
-    for (method <- missingMethods) {
-      fail("Method " + method + " from " + javaClass + " is missing from " + scalaClassName + ".")
+    for (javaMethod <- missingMethods) {
+      // check if the method simply follows different getter / setter conventions in Scala / Java
+      // for example Java: getFoo() should match Scala: foo()
+      if (!containsScalaGetterLike(javaMethod, scalaMethods)) {
+        fail(s"Method $javaMethod from $javaClass is missing from $scalaClassName.")
+      }
     }
   }
 
-  protected def checkEquality(scalaInstance: AnyRef, extractJavaFun : ((AnyRef) => AnyRef)) {
-    val javaInstance = extractJavaFun(scalaInstance)
+  protected def containsScalaGetterLike(javaMethod: String, scalaMethods: Set[String]): Boolean = {
+    if (javaMethod.startsWith("get") && javaMethod.length >= 4) {
+      val scalaMethodName = Character.toLowerCase(javaMethod.charAt(3)) + javaMethod.substring(4)
+      scalaMethods.contains(scalaMethodName)
+    } else {
+      false
+    }
   }
-
+  
   /**
    * Tests to be performed to ensure API completeness.
    */