You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:46 UTC
[03/11] flink git commit: [FLINK-5133] [core] Add new setResource API
for DataStream and DataSet
[FLINK-5133] [core] Add new setResource API for DataStream and DataSet
This introduces the internals, but does not yet make it public in the API.
This closes #3303
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f37ed029
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f37ed029
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f37ed029
Branch: refs/heads/master
Commit: f37ed02909617da4e73a01173b67373369ec3bc8
Parents: c24c7ec
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Authored: Tue Feb 14 12:37:18 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/operators/Operator.java | 36 ++++++++
.../flink/api/java/operators/DataSink.java | 64 ++++++++++++++
.../api/java/operators/DeltaIteration.java | 65 +++++++++++++-
.../flink/api/java/operators/Operator.java | 67 ++++++++++++++
.../api/java/operators/OperatorTranslation.java | 26 ++++--
.../flink/api/java/operator/OperatorTest.java | 15 ++++
.../apache/flink/optimizer/plan/PlanNode.java | 9 ++
.../org/apache/flink/api/scala/DataSet.scala | 49 ++++++++++-
.../streaming/api/datastream/DataStream.java | 19 ++++
.../api/datastream/DataStreamSink.java | 36 ++++++++
.../datastream/SingleOutputStreamOperator.java | 36 ++++++++
.../flink/streaming/api/graph/StreamGraph.java | 7 ++
.../api/graph/StreamGraphGenerator.java | 4 +
.../flink/streaming/api/graph/StreamNode.java | 16 ++++
.../transformations/StreamTransformation.java | 42 +++++++++
.../flink/streaming/api/DataStreamTest.java | 92 ++++++++++++++++++++
.../flink/streaming/api/scala/DataStream.scala | 33 +++++++
.../streaming/api/scala/DataStreamTest.scala | 79 ++++++++++++++++-
18 files changed, 686 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index 7e70fd7..a9dedfa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -45,6 +45,10 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use
+ private ResourceSpec minResource; // the minimum resource of the contract instance.
+
+ private ResourceSpec preferredResource; // the preferred resource of the contract instance.
+
/**
* The return type of the user function.
*/
@@ -184,6 +188,38 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
+
+ /**
+ * Gets the minimum resource for this contract instance. The minimum resource denotes how many
+ * resources will be needed in the minimum for the user function during the execution.
+ *
+ * @return The minimum resource of this operator.
+ */
+ public ResourceSpec getMinResource() {
+ return this.minResource;
+ }
+
+ /**
+ * Gets the preferred resource for this contract instance. The preferred resource denotes how many
+ * resources will be needed in the maximum for the user function during the execution.
+ *
+ * @return The preferred resource of this operator.
+ */
+ public ResourceSpec getPreferredResource() {
+ return this.preferredResource;
+ }
+
+ /**
+ * Sets the minimum and preferred resources for this contract instance. The resource denotes
+ * how many memories and cpu cores of the user function will be consumed during the execution.
+ *
+ * @param minResource The minimum resource of this operator.
+ * @param preferredResource The preferred resource of this operator.
+ */
+ public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ this.minResource = minResource;
+ this.preferredResource = preferredResource;
+ }
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 8b419d9..3be9cc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -51,6 +52,10 @@ public class DataSink<T> {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+ private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+ private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+
private Configuration parameters;
private int[] sortKeyPositions;
@@ -278,4 +283,63 @@ public class DataSink<T> {
return this;
}
+
+ /**
+ * Returns the minimum resource of this data sink. If no minimum resource has been set,
+ * it returns the default empty resource.
+ *
+ * @return The minimum resource of this data sink.
+ */
+ public ResourceSpec getMinResource() {
+ return this.minResource;
+ }
+
+ /**
+ * Returns the preferred resource of this data sink. If no preferred resource has been set,
+ * it returns the default empty resource.
+ *
+ * @return The preferred resource of this data sink.
+ */
+ public ResourceSpec getPreferredResource() {
+ return this.preferredResource;
+ }
+
+ /**
+ * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
+ * The minimum resource must be satisfied and the preferred resource specifies the upper bound
+ * for dynamic resource resize.
+ *
+ * @param minResource The minimum resource for this data sink.
+ * @param preferredResource The preferred resource for this data sink.
+ * @return The data sink with set minimum and preferred resources.
+ */
+ /*
+ public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ Preconditions.checkNotNull(minResource != null && preferredResource != null,
+ "The min and preferred resources must be not null.");
+ Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+ "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+ this.minResource = minResource;
+ this.preferredResource = preferredResource;
+
+ return this;
+ }*/
+
+ /**
+ * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources.
+ *
+ * @param resource The resource for this data sink.
+ * @return The data sink with set minimum and preferred resources.
+ */
+ /*
+ public DataSink<T> setResource(ResourceSpec resource) {
+ Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+ Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+ this.minResource = resource;
+ this.preferredResource = resource;
+
+ return this;
+ }*/
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index b97a9de..cf0a63e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -62,10 +63,13 @@ public class DeltaIteration<ST, WT> {
private String name;
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+
+ private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+ private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
private boolean solutionSetUnManaged;
-
public DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
initialSolutionSet = solutionSet;
initialWorkset = workset;
@@ -192,6 +196,65 @@ public class DeltaIteration<ST, WT> {
public int getParallelism() {
return parallelism;
}
+
+ /**
+ * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
+ * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+ *
+ * @param minResource The minimum resource for the iteration.
+ * @param preferredResource The preferred resource for the iteration.
+ * @return The iteration with set minimum and preferred resources.
+ */
+ /*
+ public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ Preconditions.checkNotNull(minResource != null && preferredResource != null,
+ "The min and preferred resources must be not null.");
+ Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+ "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+ this.minResource = minResource;
+ this.preferredResource = preferredResource;
+
+ return this;
+ }*/
+
+ /**
+ * Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
+ * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+ *
+ * @param resource The resource for the iteration.
+ * @return The iteration with set minimum and preferred resources.
+ */
+ /*
+ public DeltaIteration<ST, WT> setResource(ResourceSpec resource) {
+ Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+ Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0.");
+
+ this.minResource = resource;
+ this.preferredResource = resource;
+
+ return this;
+ }*/
+
+ /**
+ * Gets the minimum resource from this iteration. If no minimum resource has been set,
+ * it returns the default empty resource.
+ *
+ * @return The minimum resource of the iteration.
+ */
+ public ResourceSpec getMinResource() {
+ return this.minResource;
+ }
+
+ /**
+ * Gets the preferred resource from this iteration. If no preferred resource has been set,
+ * it returns the default empty resource.
+ *
+ * @return The preferred resource of the iteration.
+ */
+ public ResourceSpec getPreferredResource() {
+ return this.preferredResource;
+ }
/**
* Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 323d23e..79cae14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -38,6 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+ protected ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+ protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+
protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
super(context, resultType);
}
@@ -71,6 +76,26 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
}
/**
+ * Returns the minimum resource of this operator. If no minimum resource has been set,
+ * it returns the default empty resource.
+ *
+ * @return The minimum resource of this operator.
+ */
+ public ResourceSpec minResource() {
+ return this.minResource;
+ }
+
+ /**
+ * Returns the preferred resource of this operator. If no preferred resource has been set,
+ * it returns the default empty resource.
+ *
+ * @return The preferred resource of this operator.
+ */
+ public ResourceSpec preferredResource() {
+ return this.preferredResource;
+ }
+
+ /**
* Sets the name of this operator. This overrides the default name, which is either
* a generated description of the operation (such as for example "Aggregate(1:SUM, 2:MIN)")
* or the name the user-defined function or input/output format executed by the operator.
@@ -103,4 +128,46 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
O returnType = (O) this;
return returnType;
}
+
+ /**
+ * Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
+ * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+ *
+ * @param minResource The minimum resource for this operator.
+ * @param preferredResource The preferred resource for this operator.
+ * @return The operator with set minimum and preferred resources.
+ */
+ /*
+ public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ Preconditions.checkNotNull(minResource != null && preferredResource != null,
+ "The min and preferred resources must be not null.");
+ Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+ "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+ this.minResource = minResource;
+ this.preferredResource = preferredResource;
+
+ @SuppressWarnings("unchecked")
+ O returnType = (O) this;
+ return returnType;
+ }*/
+
+ /**
+ * Sets the resource for this operator. This overrides the default empty minimum and preferred resources.
+ *
+ * @param resource The resource for this operator.
+ * @return The operator with set minimum and preferred resources.
+ */
+ /*
+ public O setResource(ResourceSpec resource) {
+ Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+ Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+ this.minResource = resource;
+ this.preferredResource = resource;
+
+ @SuppressWarnings("unchecked")
+ O returnType = (O) this;
+ return returnType;
+ }*/
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 88c9c37..909cd32 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -63,7 +63,9 @@ public class OperatorTranslation {
// translate the sink itself and connect it to the input
GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
-
+
+ translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource());
+
return translatedSink;
}
@@ -91,19 +93,31 @@ public class OperatorTranslation {
Operator<T> dataFlowOp;
if (dataSet instanceof DataSource) {
- dataFlowOp = ((DataSource<T>) dataSet).translateToDataFlow();
+ DataSource<T> dataSource = (DataSource<T>) dataSet;
+ dataFlowOp = dataSource.translateToDataFlow();
+ dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource());
}
else if (dataSet instanceof SingleInputOperator) {
- dataFlowOp = translateSingleInputOperator((SingleInputOperator<?, ?, ?>) dataSet);
+ SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
+ dataFlowOp = translateSingleInputOperator(singleInputOperator);
+ dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource());
}
else if (dataSet instanceof TwoInputOperator) {
- dataFlowOp = translateTwoInputOperator((TwoInputOperator<?, ?, ?, ?>) dataSet);
+ TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
+ dataFlowOp = translateTwoInputOperator(twoInputOperator);
+ dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource());
}
else if (dataSet instanceof BulkIterationResultSet) {
- dataFlowOp = translateBulkIteration((BulkIterationResultSet<?>) dataSet);
+ BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
+ dataFlowOp = translateBulkIteration(bulkIterationResultSet);
+ dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(),
+ bulkIterationResultSet.getIterationHead().preferredResource());
}
else if (dataSet instanceof DeltaIterationResultSet) {
- dataFlowOp = translateDeltaIteration((DeltaIterationResultSet<?, ?>) dataSet);
+ DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
+ dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
+ dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(),
+ deltaIterationResultSet.getIterationHead().getPreferredResource());
}
else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
index a69ca3c..992acc9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java.operator;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
@@ -45,6 +46,20 @@ public class OperatorTest {
assertEquals(parallelism, operator.getParallelism());
}
+ /*
+ @Test
+ public void testConfigurationOfResource() {
+ Operator operator = new MockOperator();
+
+ // verify explicit change in resource
+ ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0);
+ ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 0, 0);
+ operator.setResource(minResource, preferredResource);
+
+ assertEquals(minResource, operator.getMinResource());
+ assertEquals(preferredResource, operator.getPreferredResource());
+ }*/
+
private class MockOperator extends Operator {
public MockOperator() {
super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index b30fa36..4ef91b3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -19,6 +19,7 @@
package org.apache.flink.optimizer.plan;
import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.costs.Costs;
@@ -308,6 +309,14 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
public int getParallelism() {
return this.parallelism;
}
+
+ public ResourceSpec getMinResource() {
+ return this.template.getOperator().getMinResource();
+ }
+
+ public ResourceSpec getPreferredResource() {
+ return this.template.getOperator().getPreferredResource();
+ }
public long getGuaranteedAvailableMemory() {
return this.template.getMinimalMemoryAcrossAllSubTasks();
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 4e7be04..5cfb601 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator
import org.apache.flink.api.common.aggregators.Aggregator
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
-import org.apache.flink.api.common.operators.{Keys, Order}
+import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order}
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
@@ -178,6 +178,53 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
}
/**
+ * Sets the minimum and preferred resources of this operation.
+ */
+ /*
+ def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = {
+ javaSet match {
+ case ds: DataSource[_] => ds.setResource(minResource, preferredResource)
+ case op: Operator[_, _] => op.setResource(minResource, preferredResource)
+ case di: DeltaIterationResultSet[_, _] =>
+ di.getIterationHead.setResource(minResource, preferredResource)
+ case _ =>
+ throw new UnsupportedOperationException("Operator does not support " +
+ "configuring custom resources specs.")
+ }
+ this
+ }*/
+
+ /**
+ * Sets the resource of this operation.
+ */
+ /*
+ def resource(resource: ResourceSpec) : Unit = {
+ this.resource(resource, resource)
+ }*/
+
+ /**
+ * Returns the minimum resource of this operation.
+ */
+ def minResource: ResourceSpec = javaSet match {
+ case ds: DataSource[_] => ds.minResource()
+ case op: Operator[_, _] => op.minResource
+ case _ =>
+ throw new UnsupportedOperationException("Operator does not support " +
+ "configuring custom resources specs.")
+ }
+
+ /**
+ * Returns the preferred resource of this operation.
+ */
+ def preferredResource: ResourceSpec = javaSet match {
+ case ds: DataSource[_] => ds.preferredResource()
+ case op: Operator[_, _] => op.preferredResource
+ case _ =>
+ throw new UnsupportedOperationException("Operator does not support " +
+ "configuring custom resources specs.")
+ }
+
+ /**
* Registers an [[org.apache.flink.api.common.aggregators.Aggregator]]
* for the iteration. Aggregators can be used to maintain simple statistics during the
* iteration, such as number of elements processed. The aggregators compute global aggregates:
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 204557d..ae1c39a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -143,6 +144,24 @@ public class DataStream<T> {
}
/**
+ * Gets the minimum resource for this operator.
+ *
+ * @return The minimum resource set for this operator.
+ */
+ public ResourceSpec minResource() {
+ return transformation.getMinResource();
+ }
+
+ /**
+ * Gets the preferred resource for this operator.
+ *
+ * @return The preferred resource set for this operator.
+ */
+ public ResourceSpec preferredResource() {
+ return transformation.getPreferredResource();
+ }
+
+ /**
* Gets the type of the stream.
*
* @return The type of the datastream.
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 0c9378b..69e21d6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -114,6 +114,42 @@ public class DataStreamSink<T> {
}
/**
+ * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
+ * be considered in resource resize feature for future plan.
+ *
+ * @param minResource The minimum resource for this sink.
+ * @param preferredResource The preferred resource for this sink
+ * @return The sink with set minimum and preferred resources.
+ */
+ /*
+ public DataStreamSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ Preconditions.checkNotNull(minResource != null && preferredResource != null,
+ "The min and preferred resources must be not null.");
+ Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+ "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+ transformation.setResource(minResource, preferredResource);
+
+ return this;
+ }*/
+
+ /**
+ * Sets the resource for this sink, the minimum and preferred resources are the same by default.
+ *
+ * @param resource The resource for this sink.
+ * @return The sink with set minimum and preferred resources.
+ */
+ /*
+ public DataStreamSink<T> setResource(ResourceSpec resource) {
+ Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+ Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+ transformation.setResource(resource, resource);
+
+ return this;
+ }*/
+
+ /**
* Turns off chaining for this operator so thread co-location will not be
* used as an optimization.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 9dd60b7..d856603 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -154,6 +154,42 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
return this;
}
+ /**
+ * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will
+ * be considered in dynamic resource resize feature for future plan.
+ *
+ * @param minResource The minimum resource for this operator.
+ * @param preferredResource The preferred resource for this operator.
+ * @return The operator with set minimum and preferred resources.
+ */
+ /*
+ public SingleOutputStreamOperator<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ Preconditions.checkArgument(minResource != null && preferredResource != null,
+ "The min and preferred resources must be not null.");
+ Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
+ "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+
+ transformation.setResource(minResource, preferredResource);
+
+ return this;
+ }*/
+
+ /**
+ * Sets the resource for this operator, the minimum and preferred resources are the same by default.
+ *
+ * @param resource The resource for this operator.
+ * @return The operator with set minimum and preferred resources.
+ */
+ /*
+ public SingleOutputStreamOperator<T> setResource(ResourceSpec resource) {
+ Preconditions.checkNotNull(resource != null, "The resource must be not null.");
+ Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
+
+ transformation.setResource(resource, resource);
+
+ return this;
+ }*/
+
private boolean canBeParallel() {
return !nonParallel;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 696c04b..fcbc607 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -32,6 +32,7 @@ import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
@@ -413,6 +414,12 @@ public class StreamGraph extends StreamingPlan {
}
}
+ public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) {
+ if (getStreamNode(vertexID) != null) {
+ getStreamNode(vertexID).setResource(minResource, preferredResource);
+ }
+ }
+
public void setOneInputStateKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
StreamNode node = getStreamNode(vertexID);
node.setStatePartitioner1(keySelector);
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index ddd0515..af92421 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -202,6 +202,10 @@ public class StreamGraphGenerator {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
+ if (transform.getMinResource() != null && transform.getPreferredResource() != null) {
+ streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource());
+ }
+
return transformedIds;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 19a3699..0bf9adf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -48,6 +49,8 @@ public class StreamNode implements Serializable {
* dynamic scaling and the number of key groups used for partitioned state.
*/
private int maxParallelism;
+ private ResourceSpec minResource;
+ private ResourceSpec preferredResource;
private Long bufferTimeout = null;
private final String operatorName;
private String slotSharingGroup;
@@ -165,6 +168,19 @@ public class StreamNode implements Serializable {
this.maxParallelism = maxParallelism;
}
+ public ResourceSpec getMinResource() {
+ return minResource;
+ }
+
+ public ResourceSpec getPreferredResource() {
+ return preferredResource;
+ }
+
+ public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ this.minResource = minResource;
+ this.preferredResource = preferredResource;
+ }
+
public Long getBufferTimeout() {
return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 5e1b3e2..1d22454 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.transformations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -126,6 +127,18 @@ public abstract class StreamTransformation<T> {
private int maxParallelism = -1;
/**
+ * The minimum resource for this stream transformation. It defines the lower limit for
+ * dynamic resource resize in future plan.
+ */
+ private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+
+ /**
+ * The preferred resource for this stream transformation. It defines the upper limit for
+ * dynamic resource resize in future plan.
+ */
+ private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+
+ /**
* User-specified ID for this transformation. This is used to assign the
* same operator ID across job restarts. There is also the automatically
* generated {@link #id}, which is assigned from a static counter. That
@@ -214,6 +227,35 @@ public abstract class StreamTransformation<T> {
}
/**
+ * Sets the minimum and preferred resources for this stream transformation.
+ *
+ * @param minResource The minimum resource of this transformation.
+ * @param preferredResource The preferred resource of this transformation.
+ */
+ public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
+ this.minResource = minResource;
+ this.preferredResource = preferredResource;
+ }
+
+ /**
+ * Gets the minimum resource of this stream transformation.
+ *
+ * @return The minimum resource of this transformation.
+ */
+ public ResourceSpec getMinResource() {
+ return minResource;
+ }
+
+ /**
+ * Gets the preferred resource of this stream transformation.
+ *
+ * @return The preferred resource of this transformation.
+ */
+ public ResourceSpec getPreferredResource() {
+ return preferredResource;
+ }
+
+ /**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index eaac6b8..12af1d4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -501,6 +502,97 @@ public class DataStreamTest {
assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
}
+ /**
+ * Tests whether resource gets set.
+ */
+ /*
+ @Test
+ public void testResource() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ ResourceSpec minResource1 = new ResourceSpec(1.0, 100);
+ ResourceSpec preferredResource1 = new ResourceSpec(2.0, 200);
+
+ ResourceSpec minResource2 = new ResourceSpec(1.0, 200);
+ ResourceSpec preferredResource2 = new ResourceSpec(2.0, 300);
+
+ ResourceSpec minResource3 = new ResourceSpec(1.0, 300);
+ ResourceSpec preferredResource3 = new ResourceSpec(2.0, 400);
+
+ ResourceSpec minResource4 = new ResourceSpec(1.0, 400);
+ ResourceSpec preferredResource4 = new ResourceSpec(2.0, 500);
+
+ ResourceSpec minResource5 = new ResourceSpec(1.0, 500);
+ ResourceSpec preferredResource5 = new ResourceSpec(2.0, 600);
+
+ ResourceSpec minResource6 = new ResourceSpec(1.0, 600);
+ ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700);
+
+ ResourceSpec minResource7 = new ResourceSpec(1.0, 700);
+ ResourceSpec maxResource7 = new ResourceSpec(2.0, 800);
+
+ DataStream<Long> source1 = env.generateSequence(0, 0).setResource(minResource1, preferredResource1);
+ DataStream<Long> map1 = source1.map(new MapFunction<Long, Long>() {
+ @Override
+ public Long map(Long value) throws Exception {
+ return null;
+ }
+ }).setResource(minResource2, preferredResource2);
+
+ DataStream<Long> source2 = env.generateSequence(0, 0).setResource(minResource3, preferredResource3);
+ DataStream<Long> map2 = source2.map(new MapFunction<Long, Long>() {
+ @Override
+ public Long map(Long value) throws Exception {
+ return null;
+ }
+ }).setResource(minResource4, preferredResource4);
+
+ DataStream<Long> connected = map1.connect(map2)
+ .flatMap(new CoFlatMapFunction<Long, Long, Long>() {
+ @Override
+ public void flatMap1(Long value, Collector<Long> out) throws Exception {
+ }
+ @Override
+ public void flatMap2(Long value, Collector<Long> out) throws Exception {
+ }
+ }).setResource(minResource5, preferredResource5);
+
+ DataStream<Long> windowed = connected
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of(10)))
+ .fold(0L, new FoldFunction<Long, Long>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long fold(Long accumulator, Long value) throws Exception {
+ return null;
+ }
+ }).setResource(minResource6, preferredResource6);
+
+ DataStreamSink<Long> sink = windowed.print().setResource(minResource7, maxResource7);
+
+ assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResource());
+ assertEquals(preferredResource1, env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResource());
+
+ assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResource());
+ assertEquals(preferredResource2, env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResource());
+
+ assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResource());
+ assertEquals(preferredResource3, env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResource());
+
+ assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResource());
+ assertEquals(preferredResource4, env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResource());
+
+ assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResource());
+ assertEquals(preferredResource5, env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResource());
+
+ assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource());
+ assertEquals(preferredResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResource());
+
+ assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource());
+ assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResource());
+ }*/
+
@Test
public void testTypeInfo() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index ba92f86..e42fb3f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
import org.apache.flink.api.common.io.OutputFormat
+import org.apache.flink.api.common.operators.ResourceSpec
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
@@ -145,6 +146,38 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
+ * Returns the minimum resource of this operation.
+ */
+ def minResource: ResourceSpec = stream.minResource()
+
+ /**
+ * Returns the preferred resource of this operation.
+ */
+ def preferredResource: ResourceSpec = stream.preferredResource()
+
+ /**
+ * Sets the minimum and preferred resources of this operation.
+ */
+ /*
+ def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] =
+ stream match {
+ case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource(
+ minResource, preferredResource))
+ case _ =>
+ throw new UnsupportedOperationException("Operator does not support " +
+ "configuring custom resources specs.")
+ this
+ }*/
+
+ /**
+ * Sets the resource of this operation.
+ */
+ /*
+ def resource(resource: ResourceSpec) : Unit = {
+ this.resource(resource, resource)
+ }*/
+
+ /**
* Gets the name of the current data stream. This name is
* used by the visualization and logging during runtime.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/f37ed029/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index adb59f2..841567a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
import java.lang
import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.operators.ResourceSpec
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.ProcessFunction
@@ -34,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.streaming.runtime.partitioner._
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.util.Collector
-import org.junit.Assert.fail
+import org.junit.Assert._
import org.junit.Test
class DataStreamTest extends StreamingMultipleProgramsTestBase {
@@ -291,6 +292,82 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
}
+ /**
+ * Tests whether resource gets set.
+ */
+ /*
+ @Test
+ def testResource() {
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val minResource1: ResourceSpec = new ResourceSpec(1.0, 100)
+ val preferredResource1: ResourceSpec = new ResourceSpec(2.0, 200)
+ val minResource2: ResourceSpec = new ResourceSpec(1.0, 200)
+ val preferredResource2: ResourceSpec = new ResourceSpec(2.0, 300)
+ val minResource3: ResourceSpec = new ResourceSpec(1.0, 300)
+ val preferredResource3: ResourceSpec = new ResourceSpec(2.0, 400)
+ val minResource4: ResourceSpec = new ResourceSpec(1.0, 400)
+ val preferredResource4: ResourceSpec = new ResourceSpec(2.0, 500)
+ val minResource5: ResourceSpec = new ResourceSpec(1.0, 500)
+ val preferredResource5: ResourceSpec = new ResourceSpec(2.0, 600)
+ val minResource6: ResourceSpec = new ResourceSpec(1.0, 600)
+ val preferredResource6: ResourceSpec = new ResourceSpec(2.0, 700)
+ val minResource7: ResourceSpec = new ResourceSpec(1.0, 700)
+ val preferredResource7: ResourceSpec = new ResourceSpec(2.0, 800)
+
+ val source1: DataStream[Long] = env.generateSequence(0, 0)
+ .resource(minResource1, preferredResource1)
+ val map1: DataStream[String] = source1.map(x => "")
+ .resource(minResource2, preferredResource2)
+ val source2: DataStream[Long] = env.generateSequence(0, 0)
+ .resource(minResource3, preferredResource3)
+ val map2: DataStream[String] = source2.map(x => "")
+ .resource(minResource4, preferredResource4)
+
+ val connected: DataStream[String] = map1.connect(map2)
+ .flatMap({ (in, out: Collector[(String)]) => }, { (in, out: Collector[(String)]) => })
+ .resource(minResource5, preferredResource5)
+
+ val windowed = connected
+ .windowAll(GlobalWindows.create())
+ .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
+ .fold("")((accumulator: String, value: String) => "")
+ .resource(minResource6, preferredResource6)
+
+ var sink = windowed.print().resource(minResource7, preferredResource7)
+
+ val plan = env.getExecutionPlan
+
+ assertEquals(minResource1, env.getStreamGraph.getStreamNode(source1.getId).
+ getMinResource)
+ assertEquals(preferredResource1, env.getStreamGraph.getStreamNode(source1.getId).
+ getPreferredResource)
+ assertEquals(minResource2, env.getStreamGraph.getStreamNode(map1.getId).
+ getMinResource)
+ assertEquals(preferredResource2, env.getStreamGraph.getStreamNode(map1.getId).
+ getPreferredResource)
+ assertEquals(minResource3, env.getStreamGraph.getStreamNode(source2.getId).
+ getMinResource)
+ assertEquals(preferredResource3, env.getStreamGraph.getStreamNode(source2.getId).
+ getPreferredResource)
+ assertEquals(minResource4, env.getStreamGraph.getStreamNode(map2.getId).
+ getMinResource)
+ assertEquals(preferredResource4, env.getStreamGraph.getStreamNode(map2.getId).
+ getPreferredResource)
+ assertEquals(minResource5, env.getStreamGraph.getStreamNode(connected.getId).
+ getMinResource)
+ assertEquals(preferredResource5, env.getStreamGraph.getStreamNode(connected.getId).
+ getPreferredResource)
+ assertEquals(minResource6, env.getStreamGraph.getStreamNode(windowed.getId).
+ getMinResource)
+ assertEquals(preferredResource6, env.getStreamGraph.getStreamNode(windowed.getId).
+ getPreferredResource)
+ assertEquals(minResource7, env.getStreamGraph.getStreamNode(
+ sink.getPreferredResource.getId).getMinResource)
+ assertEquals(preferredResource7, env.getStreamGraph.getStreamNode(
+ sink.getPreferredResource.getId).getPreferredResource)
+ }*/
+
@Test
def testTypeInfo() {
val env = StreamExecutionEnvironment.getExecutionEnvironment