You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:46 UTC

[03/11] flink git commit: [FLINK-5133] [core] Add new setResource API for DataStream and DataSet

[FLINK-5133] [core] Add new setResource API for DataStream and DataSet

This introduces the internals, but does not yet make it public in the API.

This closes #3303


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f37ed029
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f37ed029
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f37ed029

Branch: refs/heads/master
Commit: f37ed02909617da4e73a01173b67373369ec3bc8
Parents: c24c7ec
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Tue Feb 14 12:37:18 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/Operator.java    | 36 ++++++++
 .../flink/api/java/operators/DataSink.java      | 64 ++++++++++++++
 .../api/java/operators/DeltaIteration.java      | 65 +++++++++++++-
 .../flink/api/java/operators/Operator.java      | 67 ++++++++++++++
 .../api/java/operators/OperatorTranslation.java | 26 ++++--
 .../flink/api/java/operator/OperatorTest.java   | 15 ++++
 .../apache/flink/optimizer/plan/PlanNode.java   |  9 ++
 .../org/apache/flink/api/scala/DataSet.scala    | 49 ++++++++++-
 .../streaming/api/datastream/DataStream.java    | 19 ++++
 .../api/datastream/DataStreamSink.java          | 36 ++++++++
 .../datastream/SingleOutputStreamOperator.java  | 36 ++++++++
 .../flink/streaming/api/graph/StreamGraph.java  |  7 ++
 .../api/graph/StreamGraphGenerator.java         |  4 +
 .../flink/streaming/api/graph/StreamNode.java   | 16 ++++
 .../transformations/StreamTransformation.java   | 42 +++++++++
 .../flink/streaming/api/DataStreamTest.java     | 92 ++++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 33 +++++++
 .../streaming/api/scala/DataStreamTest.scala    | 79 ++++++++++++++++-
 18 files changed, 686 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 7e70fd7..a9dedfa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -45,6 +45,10 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 		
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;  // the number of parallel instances to use
 
+	private ResourceSpec minResource;			// the minimum resource of the contract instance.
+
+	private ResourceSpec preferredResource;	// the preferred resource of the contract instance.
+
 	/**
 	 * The return type of the user function.
 	 */
@@ -184,6 +188,38 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
 	public void setParallelism(int parallelism) {
 		this.parallelism = parallelism;
 	}
+
+	/**
+	 * Gets the minimum resource for this contract instance. The minimum resource denotes how many
+	 * resources will be needed in the minimum for the user function during the execution.
+	 *
+	 * @return The minimum resource of this operator.
+	 */
+	public ResourceSpec getMinResource() {
+		return this.minResource;
+	}
+
+	/**
+	 * Gets the preferred resource for this contract instance. The preferred resource denotes how many
+	 * resources will be needed in the maximum for the user function during the execution.
+	 *
+	 * @return The preferred resource of this operator.
+	 */
+	public ResourceSpec getPreferredResource() {
+		return this.preferredResource;
+	}
+
+	/**
+	 * Sets the minimum and preferred resources for this contract instance. The resource denotes
+	 * how many memories and cpu cores of the user function will be consumed during the execution.
+	 *
+	 * @param minResource The minimum resource of this operator.
+	 * @param preferredResource The preferred resource of this operator.
+	 */
+	public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		this.minResource = minResource;
+		this.preferredResource = preferredResource;
+	}
 	
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 8b419d9..3be9cc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -51,6 +52,10 @@ public class DataSink<T> {
 	
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
+	private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+	private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+
 	private Configuration parameters;
 
 	private int[] sortKeyPositions;
@@ -278,4 +283,63 @@ public class DataSink<T> {
 
 		return this;
 	}
+
+	/**
+	 * Returns the minimum resource of this data sink. If no minimum resource has been set,
+	 * it returns the default empty resource.
+	 *
+	 * @return The minimum resource of this data sink.
+	 */
+	public ResourceSpec getMinResource() {
+		return this.minResource;
+	}
+
+	/**
+	 * Returns the preferred resource of this data sink. If no preferred resource has been set,
+	 * it returns the default empty resource.
+	 *
+	 * @return The preferred resource of this data sink.
+	 */
+	public ResourceSpec getPreferredResource() {
+		return this.preferredResource;
+	}
+
+	/**
+	 * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
+	 *	The minimum resource must be satisfied and the preferred resource specifies the upper bound
+	 * for dynamic resource resize.
+	 *
+	 * @param minResource The minimum resource for this data sink.
+	 * @param preferredResource The preferred resource for this data sink.
+	 * @return The data sink with set minimum and preferred resources.
+	 */
+	/*
+	public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		Preconditions.checkNotNull(minResource != null && preferredResource != null,
+				"The min and preferred resources must be not null.");
+		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+		this.minResource = minResource;
+		this.preferredResource = preferredResource;
+
+		return this;
+	}*/
+
+	/**
+	 * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources.
+	 *
+	 * @param resource The resource for this data sink.
+	 * @return The data sink with set minimum and preferred resources.
+	 */
+	/*
+	public DataSink<T> setResource(ResourceSpec resource) {
+		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+		this.minResource = resource;
+		this.preferredResource = resource;
+
+		return this;
+	}*/
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index b97a9de..cf0a63e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -62,10 +63,13 @@ public class DeltaIteration<ST, WT> {
 	private String name;
 	
 	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+	private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+	private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
 	
 	private boolean solutionSetUnManaged;
 
-
 	public DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
 		initialSolutionSet = solutionSet;
 		initialWorkset = workset;
@@ -192,6 +196,65 @@ public class DeltaIteration<ST, WT> {
 	public int getParallelism() {
 		return parallelism;
 	}
+
+	/**
+	 * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
+	 *	The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+	 *
+	 * @param minResource The minimum resource for the iteration.
+	 * @param preferredResource The preferred resource for the iteration.
+	 * @return The iteration with set minimum and preferred resources.
+	 */
+	/*
+	public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		Preconditions.checkNotNull(minResource != null && preferredResource != null,
+				"The min and preferred resources must be not null.");
+		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+		this.minResource = minResource;
+		this.preferredResource = preferredResource;
+
+		return this;
+	}*/
+
+	/**
+	 * Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
+	 *	The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+	 *
+	 * @param resource The resource for the iteration.
+	 * @return The iteration with set minimum and preferred resources.
+	 */
+	/*
+	public DeltaIteration<ST, WT> setResource(ResourceSpec resource) {
+		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+		Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0.");
+
+		this.minResource = resource;
+		this.preferredResource = resource;
+
+		return this;
+	}*/
+
+	/**
+	 * Gets the minimum resource from this iteration. If no minimum resource has been set,
+	 * it returns the default empty resource.
+	 *
+	 * @return The minimum resource of the iteration.
+	 */
+	public ResourceSpec getMinResource() {
+		return this.minResource;
+	}
+
+	/**
+	 * Gets the preferred resource from this iteration. If no preferred resource has been set,
+	 * it returns the default empty resource.
+	 *
+	 * @return The preferred resource of the iteration.
+	 */
+	public ResourceSpec getPreferredResource() {
+		return this.preferredResource;
+	}
 	
 	/**
 	 * Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 323d23e..79cae14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -38,6 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	
 	protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
+	protected ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+	protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+
 	protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
 		super(context, resultType);
 	}
@@ -71,6 +76,26 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	}
 
 	/**
+	 * Returns the minimum resource of this operator. If no minimum resource has been set,
+	 * it returns the default empty resource.
+	 *
+	 * @return The minimum resource of this operator.
+	 */
+	public ResourceSpec minResource() {
+		return this.minResource;
+	}
+
+	/**
+	 * Returns the preferred resource of this operator. If no preferred resource has been set,
+	 * it returns the default empty resource.
+	 *
+	 * @return The preferred resource of this operator.
+	 */
+	public ResourceSpec preferredResource() {
+		return this.preferredResource;
+	}
+
+	/**
 	 * Sets the name of this operator. This overrides the default name, which is either
 	 * a generated description of the operation (such as for example "Aggregate(1:SUM, 2:MIN)")
 	 * or the name the user-defined function or input/output format executed by the operator.
@@ -103,4 +128,46 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 		O returnType = (O) this;
 		return returnType;
 	}
+
+	/**
+	 * Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
+	 * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+	 *
+	 * @param minResource The minimum resource for this operator.
+	 * @param preferredResource The preferred resource for this operator.
+	 * @return The operator with set minimum and preferred resources.
+	 */
+	/*
+	public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		Preconditions.checkNotNull(minResource != null && preferredResource != null,
+				"The min and preferred resources must be not null.");
+		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+		this.minResource = minResource;
+		this.preferredResource = preferredResource;
+
+		@SuppressWarnings("unchecked")
+		O returnType = (O) this;
+		return returnType;
+	}*/
+
+	/**
+	 * Sets the resource for this operator. This overrides the default empty minimum and preferred resources.
+	 *
+	 * @param resource The resource for this operator.
+	 * @return The operator with set minimum and preferred resources.
+	 */
+	/*
+	public O setResource(ResourceSpec resource) {
+		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+		this.minResource = resource;
+		this.preferredResource = resource;
+
+		@SuppressWarnings("unchecked")
+		O returnType = (O) this;
+		return returnType;
+	}*/
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 88c9c37..909cd32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -63,7 +63,9 @@ public class OperatorTranslation {
 		
 		// translate the sink itself and connect it to the input
 		GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
-				
+
+		translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource());
+
 		return translatedSink;
 	}
 	
@@ -91,19 +93,31 @@ public class OperatorTranslation {
 		Operator<T> dataFlowOp;
 		
 		if (dataSet instanceof DataSource) {
-			dataFlowOp = ((DataSource<T>) dataSet).translateToDataFlow();
+			DataSource<T> dataSource = (DataSource<T>) dataSet;
+			dataFlowOp = dataSource.translateToDataFlow();
+			dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource());
 		}
 		else if (dataSet instanceof SingleInputOperator) {
-			dataFlowOp = translateSingleInputOperator((SingleInputOperator<?, ?, ?>) dataSet);
+			SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
+			dataFlowOp = translateSingleInputOperator(singleInputOperator);
+			dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource());
 		}
 		else if (dataSet instanceof TwoInputOperator) {
-			dataFlowOp = translateTwoInputOperator((TwoInputOperator<?, ?, ?, ?>) dataSet);
+			TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
+			dataFlowOp = translateTwoInputOperator(twoInputOperator);
+			dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource());
 		}
 		else if (dataSet instanceof BulkIterationResultSet) {
-			dataFlowOp = translateBulkIteration((BulkIterationResultSet<?>) dataSet);
+			BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
+			dataFlowOp = translateBulkIteration(bulkIterationResultSet);
+			dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(),
+					bulkIterationResultSet.getIterationHead().preferredResource());
 		}
 		else if (dataSet instanceof DeltaIterationResultSet) {
-			dataFlowOp = translateDeltaIteration((DeltaIterationResultSet<?, ?>) dataSet);
+			DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
+			dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
+			dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(),
+					deltaIterationResultSet.getIterationHead().getPreferredResource());
 		}
 		else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
 			throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
index a69ca3c..992acc9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.Operator;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
@@ -45,6 +46,20 @@ public class OperatorTest {
 		assertEquals(parallelism, operator.getParallelism());
 	}
 
+	/*
+	@Test
+	public void testConfigurationOfResource() {
+		Operator operator = new MockOperator();
+
+		// verify explicit change in resource
+		ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0);
+		ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 0, 0);
+		operator.setResource(minResource, preferredResource);
+
+		assertEquals(minResource, operator.getMinResource());
+		assertEquals(preferredResource, operator.getPreferredResource());
+	}*/
+
 	private class MockOperator extends Operator {
 		public MockOperator() {
 			super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index b30fa36..4ef91b3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -19,6 +19,7 @@
 package org.apache.flink.optimizer.plan;
 
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.costs.Costs;
@@ -308,6 +309,14 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 	public int getParallelism() {
 		return this.parallelism;
 	}
+
+	public ResourceSpec getMinResource() {
+		return this.template.getOperator().getMinResource();
+	}
+
+	public ResourceSpec getPreferredResource() {
+		return this.template.getOperator().getPreferredResource();
+	}
 	
 	public long getGuaranteedAvailableMemory() {
 		return this.template.getMinimalMemoryAcrossAllSubTasks();

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 4e7be04..5cfb601 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.aggregators.Aggregator
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{Keys, Order}
+import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order}
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
@@ -178,6 +178,53 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
+   * Sets the minimum and preferred resources of this operation.
+   */
+  /*
+  def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = {
+    javaSet match {
+      case ds: DataSource[_] => ds.setResource(minResource, preferredResource)
+      case op: Operator[_, _] => op.setResource(minResource, preferredResource)
+      case di: DeltaIterationResultSet[_, _] =>
+        di.getIterationHead.setResource(minResource, preferredResource)
+      case _ =>
+        throw new UnsupportedOperationException("Operator does not support " +
+          "configuring custom resources specs.")
+    }
+    this
+  }*/
+
+  /**
+   * Sets the resource of this operation.
+   */
+  /*
+  def resource(resource: ResourceSpec) : Unit = {
+    this.resource(resource, resource)
+  }*/
+
+  /**
+   * Returns the minimum resource of this operation.
+   */
+  def minResource: ResourceSpec = javaSet match {
+    case ds: DataSource[_] => ds.minResource()
+    case op: Operator[_, _] => op.minResource
+    case _ =>
+      throw new UnsupportedOperationException("Operator does not support " +
+        "configuring custom resources specs.")
+  }
+
+  /**
+   * Returns the preferred resource of this operation.
+   */
+  def preferredResource: ResourceSpec = javaSet match {
+    case ds: DataSource[_] => ds.preferredResource()
+    case op: Operator[_, _] => op.preferredResource
+    case _ =>
+      throw new UnsupportedOperationException("Operator does not support " +
+        "configuring custom resources specs.")
+  }
+
+  /**
    * Registers an [[org.apache.flink.api.common.aggregators.Aggregator]]
    * for the iteration. Aggregators can be used to maintain simple statistics during the
    * iteration, such as number of elements processed. The aggregators compute global aggregates:

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 204557d..ae1c39a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -143,6 +144,24 @@ public class DataStream<T> {
 	}
 
 	/**
+	 * Gets the minimum resource for this operator.
+	 *
+	 * @return The minimum resource set for this operator.
+	 */
+	public ResourceSpec minResource() {
+		return transformation.getMinResource();
+	}
+
+	/**
+	 * Gets the preferred resource for this operator.
+	 *
+	 * @return The preferred resource set for this operator.
+	 */
+	public ResourceSpec preferredResource() {
+		return transformation.getPreferredResource();
+	}
+
+	/**
 	 * Gets the type of the stream.
 	 *
 	 * @return The type of the datastream.

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 0c9378b..69e21d6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -114,6 +114,42 @@ public class DataStreamSink<T> {
 	}
 
 	/**
+	 * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
+	 * be considered in resource resize feature for future plan.
+	 *
+	 * @param minResource The minimum resource for this sink.
+	 * @param preferredResource The preferred resource for this sink
+	 * @return The sink with set minimum and preferred resources.
+	 */
+	/*
+	public DataStreamSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		Preconditions.checkNotNull(minResource != null && preferredResource != null,
+				"The min and preferred resources must be not null.");
+		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+		transformation.setResource(minResource, preferredResource);
+
+		return this;
+	}*/
+
+	/**
+	 * Sets the resource for this sink, the minimum and preferred resources are the same by default.
+	 *
+	 * @param resource The resource for this sink.
+	 * @return The sink with set minimum and preferred resources.
+	 */
+	/*
+	public DataStreamSink<T> setResource(ResourceSpec resource) {
+		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+		transformation.setResource(resource, resource);
+
+		return this;
+	}*/
+
+	/**
 	 * Turns off chaining for this operator so thread co-location will not be
 	 * used as an optimization.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 9dd60b7..d856603 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -154,6 +154,42 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
 		return this;
 	}
 
+	/**
+	 * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will
+	 * be considered in dynamic resource resize feature for future plan.
+	 *
+	 * @param minResource The minimum resource for this operator.
+	 * @param preferredResource The preferred resource for this operator.
+	 * @return The operator with set minimum and preferred resources.
+	 */
+	/*
+	public SingleOutputStreamOperator<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		Preconditions.checkArgument(minResource != null && preferredResource != null,
+				"The min and preferred resources must be not null.");
+		Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+				"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+		transformation.setResource(minResource, preferredResource);
+
+		return this;
+	}*/
+
+	/**
+	 * Sets the resource for this operator, the minimum and preferred resources are the same by default.
+	 *
+	 * @param resource The resource for this operator.
+	 * @return The operator with set minimum and preferred resources.
+	 */
+	/*
+	public SingleOutputStreamOperator<T> setResource(ResourceSpec resource) {
+		Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+		Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+		transformation.setResource(resource, resource);
+
+		return this;
+	}*/
+
 	private boolean canBeParallel() {
 		return !nonParallel;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 696c04b..fcbc607 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -413,6 +414,12 @@ public class StreamGraph extends StreamingPlan {
 		}
 	}
 
+	public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) {
+		if (getStreamNode(vertexID) != null) {
+			getStreamNode(vertexID).setResource(minResource, preferredResource);
+		}
+	}
+
 	public void setOneInputStateKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
 		StreamNode node = getStreamNode(vertexID);
 		node.setStatePartitioner1(keySelector);

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index ddd0515..af92421 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -202,6 +202,10 @@ public class StreamGraphGenerator {
 			streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
 		}
 
+		if (transform.getMinResource() != null && transform.getPreferredResource() != null) {
+			streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource());
+		}
+
 		return transformedIds;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 19a3699..0bf9adf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -48,6 +49,8 @@ public class StreamNode implements Serializable {
 	 * dynamic scaling and the number of key groups used for partitioned state.
 	 */
 	private int maxParallelism;
+	private ResourceSpec minResource;
+	private ResourceSpec preferredResource;
 	private Long bufferTimeout = null;
 	private final String operatorName;
 	private String slotSharingGroup;
@@ -165,6 +168,19 @@ public class StreamNode implements Serializable {
 		this.maxParallelism = maxParallelism;
 	}
 
+	public ResourceSpec getMinResource() {
+		return minResource;
+	}
+
+	public ResourceSpec getPreferredResource() {
+		return preferredResource;
+	}
+
+	public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		this.minResource = minResource;
+		this.preferredResource = preferredResource;
+	}
+
 	public Long getBufferTimeout() {
 		return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 5e1b3e2..1d22454 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.transformations;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -126,6 +127,18 @@ public abstract class StreamTransformation<T> {
 	private int maxParallelism = -1;
 
 	/**
+	 *  The minimum resource for this stream transformation. It defines the lower limit for
+	 *  dynamic resource resize in future plan.
+	 */
+	private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+	/**
+	 *  The preferred resource for this stream transformation. It defines the upper limit for
+	 *  dynamic resource resize in future plan.
+	 */
+	private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+
+	/**
 	 * User-specified ID for this transformation. This is used to assign the
 	 * same operator ID across job restarts. There is also the automatically
 	 * generated {@link #id}, which is assigned from a static counter. That
@@ -214,6 +227,35 @@ public abstract class StreamTransformation<T> {
 	}
 
 	/**
+	 * Sets the minimum and preferred resources for this stream transformation.
+	 *
+	 * @param minResource The minimum resource of this transformation.
+	 * @param preferredResource The preferred resource of this transformation.
+	 */
+	public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+		this.minResource = minResource;
+		this.preferredResource = preferredResource;
+	}
+
+	/**
+	 * Gets the minimum resource of this stream transformation.
+	 *
+	 * @return The minimum resource of this transformation.
+	 */
+	public ResourceSpec getMinResource() {
+		return minResource;
+	}
+
+	/**
+	 * Gets the preferred resource of this stream transformation.
+	 *
+	 * @return The preferred resource of this transformation.
+	 */
+	public ResourceSpec getPreferredResource() {
+		return preferredResource;
+	}
+
+	/**
 	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
 	 * <p/>
 	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index eaac6b8..12af1d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -501,6 +502,97 @@ public class DataStreamTest {
 		assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
 	}
 
+	/**
+	 * Tests whether resource gets set.
+	 */
+	/*
+	@Test
+	public void testResource() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		ResourceSpec minResource1 = new ResourceSpec(1.0, 100);
+		ResourceSpec preferredResource1 = new ResourceSpec(2.0, 200);
+
+		ResourceSpec minResource2 = new ResourceSpec(1.0, 200);
+		ResourceSpec preferredResource2 = new ResourceSpec(2.0, 300);
+
+		ResourceSpec minResource3 = new ResourceSpec(1.0, 300);
+		ResourceSpec preferredResource3 = new ResourceSpec(2.0, 400);
+
+		ResourceSpec minResource4 = new ResourceSpec(1.0, 400);
+		ResourceSpec preferredResource4 = new ResourceSpec(2.0, 500);
+
+		ResourceSpec minResource5 = new ResourceSpec(1.0, 500);
+		ResourceSpec preferredResource5 = new ResourceSpec(2.0, 600);
+
+		ResourceSpec minResource6 = new ResourceSpec(1.0, 600);
+		ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700);
+
+		ResourceSpec minResource7 = new ResourceSpec(1.0, 700);
+		ResourceSpec maxResource7 = new ResourceSpec(2.0, 800);
+
+		DataStream<Long> source1 = env.generateSequence(0, 0).setResource(minResource1, preferredResource1);
+		DataStream<Long> map1 = source1.map(new MapFunction<Long, Long>() {
+			@Override
+			public Long map(Long value) throws Exception {
+				return null;
+			}
+		}).setResource(minResource2, preferredResource2);
+
+		DataStream<Long> source2 = env.generateSequence(0, 0).setResource(minResource3, preferredResource3);
+		DataStream<Long> map2 = source2.map(new MapFunction<Long, Long>() {
+			@Override
+			public Long map(Long value) throws Exception {
+				return null;
+			}
+		}).setResource(minResource4, preferredResource4);
+
+		DataStream<Long> connected = map1.connect(map2)
+				.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
+					@Override
+					public void flatMap1(Long value, Collector<Long> out) throws Exception {
+					}
+					@Override
+					public void flatMap2(Long value, Collector<Long> out) throws Exception {
+					}
+				}).setResource(minResource5, preferredResource5);
+
+		DataStream<Long> windowed = connected
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(10)))
+				.fold(0L, new FoldFunction<Long, Long>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Long fold(Long accumulator, Long value) throws Exception {
+						return null;
+					}
+				}).setResource(minResource6, preferredResource6);
+
+		DataStreamSink<Long> sink = windowed.print().setResource(minResource7, maxResource7);
+
+		assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResource());
+		assertEquals(preferredResource1, env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResource());
+
+		assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResource());
+		assertEquals(preferredResource2, env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResource());
+
+		assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResource());
+		assertEquals(preferredResource3, env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResource());
+
+		assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResource());
+		assertEquals(preferredResource4, env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResource());
+
+		assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResource());
+		assertEquals(preferredResource5, env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResource());
+
+		assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource());
+		assertEquals(preferredResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResource());
+
+		assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource());
+		assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResource());
+	}*/
+
 	@Test
 	public void testTypeInfo() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index ba92f86..e42fb3f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
 import org.apache.flink.api.common.io.OutputFormat
+import org.apache.flink.api.common.operators.ResourceSpec
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
@@ -145,6 +146,38 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
+   * Returns the minimum resource of this operation.
+   */
+  def minResource: ResourceSpec = stream.minResource()
+
+  /**
+   * Returns the preferred resource of this operation.
+   */
+  def preferredResource: ResourceSpec = stream.preferredResource()
+
+  /**
+   * Sets the minimum and preferred resources of this operation.
+   */
+  /*
+  def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] =
+    stream match {
+      case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource(
+        minResource, preferredResource))
+      case _ =>
+        throw new UnsupportedOperationException("Operator does not support " +
+          "configuring custom resources specs.")
+      this
+  }*/
+
+  /**
+   * Sets the resource of this operation.
+   */
+  /*
+  def resource(resource: ResourceSpec) : Unit = {
+    this.resource(resource, resource)
+  }*/
+
+  /**
    * Gets the name of the current data stream. This name is
    * used by the visualization and logging during runtime.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index adb59f2..841567a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
 import java.lang
 
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.operators.ResourceSpec
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -34,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.util.Collector
-import org.junit.Assert.fail
+import org.junit.Assert._
 import org.junit.Test
 
 class DataStreamTest extends StreamingMultipleProgramsTestBase {
@@ -291,6 +292,82 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
   }
 
+  /**
+   * Tests whether resource gets set.
+   */
+  /*
+  @Test
+  def testResource() {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val minResource1: ResourceSpec = new ResourceSpec(1.0, 100)
+    val preferredResource1: ResourceSpec = new ResourceSpec(2.0, 200)
+    val minResource2: ResourceSpec = new ResourceSpec(1.0, 200)
+    val preferredResource2: ResourceSpec = new ResourceSpec(2.0, 300)
+    val minResource3: ResourceSpec = new ResourceSpec(1.0, 300)
+    val preferredResource3: ResourceSpec = new ResourceSpec(2.0, 400)
+    val minResource4: ResourceSpec = new ResourceSpec(1.0, 400)
+    val preferredResource4: ResourceSpec = new ResourceSpec(2.0, 500)
+    val minResource5: ResourceSpec = new ResourceSpec(1.0, 500)
+    val preferredResource5: ResourceSpec = new ResourceSpec(2.0, 600)
+    val minResource6: ResourceSpec = new ResourceSpec(1.0, 600)
+    val preferredResource6: ResourceSpec = new ResourceSpec(2.0, 700)
+    val minResource7: ResourceSpec = new ResourceSpec(1.0, 700)
+    val preferredResource7: ResourceSpec = new ResourceSpec(2.0, 800)
+
+    val source1: DataStream[Long] = env.generateSequence(0, 0)
+      .resource(minResource1, preferredResource1)
+    val map1: DataStream[String] = source1.map(x => "")
+      .resource(minResource2, preferredResource2)
+    val source2: DataStream[Long] = env.generateSequence(0, 0)
+      .resource(minResource3, preferredResource3)
+    val map2: DataStream[String] = source2.map(x => "")
+      .resource(minResource4, preferredResource4)
+
+    val connected: DataStream[String] = map1.connect(map2)
+      .flatMap({ (in, out: Collector[(String)]) => }, { (in, out: Collector[(String)]) => })
+      .resource(minResource5, preferredResource5)
+
+    val windowed  = connected
+      .windowAll(GlobalWindows.create())
+      .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
+      .fold("")((accumulator: String, value: String) => "")
+      .resource(minResource6, preferredResource6)
+
+    var sink = windowed.print().resource(minResource7, preferredResource7)
+
+    val plan = env.getExecutionPlan
+
+    assertEquals(minResource1, env.getStreamGraph.getStreamNode(source1.getId).
+      getMinResource)
+    assertEquals(preferredResource1, env.getStreamGraph.getStreamNode(source1.getId).
+      getPreferredResource)
+    assertEquals(minResource2, env.getStreamGraph.getStreamNode(map1.getId).
+      getMinResource)
+    assertEquals(preferredResource2, env.getStreamGraph.getStreamNode(map1.getId).
+      getPreferredResource)
+    assertEquals(minResource3, env.getStreamGraph.getStreamNode(source2.getId).
+      getMinResource)
+    assertEquals(preferredResource3, env.getStreamGraph.getStreamNode(source2.getId).
+      getPreferredResource)
+    assertEquals(minResource4, env.getStreamGraph.getStreamNode(map2.getId).
+      getMinResource)
+    assertEquals(preferredResource4, env.getStreamGraph.getStreamNode(map2.getId).
+      getPreferredResource)
+    assertEquals(minResource5, env.getStreamGraph.getStreamNode(connected.getId).
+      getMinResource)
+    assertEquals(preferredResource5, env.getStreamGraph.getStreamNode(connected.getId).
+      getPreferredResource)
+    assertEquals(minResource6, env.getStreamGraph.getStreamNode(windowed.getId).
+      getMinResource)
+    assertEquals(preferredResource6, env.getStreamGraph.getStreamNode(windowed.getId).
+      getPreferredResource)
+    assertEquals(minResource7, env.getStreamGraph.getStreamNode(
+      sink.getPreferredResource.getId).getMinResource)
+    assertEquals(preferredResource7, env.getStreamGraph.getStreamNode(
+      sink.getPreferredResource.getId).getPreferredResource)
+  }*/
+
   @Test
   def testTypeInfo() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment