You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/28 18:36:50 UTC
[07/11] flink git commit: [FLINK-5133] [core] Followups for
ResourceSpec on DataSet / DataStream API
[FLINK-5133] [core] Followups for ResourceSpec on DataSet / DataStream API
- Correct some use of Preconditions.checkNotNull
- Make 'resources' plural in all cases
- Add comments to why the setters are commented out
- Add @PublicEvolving annotations
- Make the Scala API completeness test match Scala-esk versions of Java Getters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9912de21
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9912de21
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9912de21
Branch: refs/heads/master
Commit: 9912de21a1053013a220707f8b3868bdbf93aaca
Parents: f37ed02
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 28 10:42:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 28 18:59:10 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/operators/Operator.java | 41 ++++---
.../api/common/operators/ResourceSpec.java | 29 ++---
.../flink/api/java/operators/DataSink.java | 109 ++++++++++---------
.../api/java/operators/DeltaIteration.java | 105 +++++++++---------
.../flink/api/java/operators/Operator.java | 101 +++++++++--------
.../api/java/operators/OperatorTranslation.java | 20 ++--
.../apache/flink/optimizer/plan/PlanNode.java | 8 +-
.../org/apache/flink/api/scala/DataSet.scala | 69 ++++++------
.../streaming/api/datastream/DataStream.java | 18 +--
.../api/datastream/DataStreamSink.java | 76 +++++++------
.../datastream/SingleOutputStreamOperator.java | 74 +++++++------
.../flink/streaming/api/graph/StreamGraph.java | 2 +-
.../api/graph/StreamGraphGenerator.java | 4 +-
.../flink/streaming/api/graph/StreamNode.java | 18 +--
.../transformations/StreamTransformation.java | 30 ++---
.../flink/streaming/api/scala/DataStream.scala | 58 +++++-----
.../ScalaAPICompletenessTestBase.scala | 36 +++---
17 files changed, 424 insertions(+), 374 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index a9dedfa..1905555 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -16,18 +16,20 @@
* limitations under the License.
*/
-
package org.apache.flink.api.common.operators;
import java.util.List;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Visitable;
+import javax.annotation.Nullable;
+
/**
* Abstract base class for all operators. An operator is a source, sink, or it applies an operation to
* one or more inputs, producing a result.
@@ -45,9 +47,11 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use
- private ResourceSpec minResource; // the minimum resource of the contract instance.
+ @Nullable
+ private ResourceSpec minResources; // the minimum resource of the contract instance.
- private ResourceSpec preferredResource; // the preferred resource of the contract instance.
+ @Nullable
+ private ResourceSpec preferredResources; // the preferred resource of the contract instance.
/**
* The return type of the user function.
@@ -190,35 +194,40 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {
}
/**
- * Gets the minimum resource for this contract instance. The minimum resource denotes how many
- * resources will be needed in the minimum for the user function during the execution.
+ * Gets the minimum resources for this operator. The minimum resources denotes how many
+ * resources will be needed at least minimum for the operator or user function during the execution.
*
- * @return The minimum resource of this operator.
+ * @return The minimum resources of this operator.
*/
- public ResourceSpec getMinResource() {
- return this.minResource;
+ @Nullable
+ @PublicEvolving
+ public ResourceSpec getMinResources() {
+ return this.minResources;
}
/**
- * Gets the preferred resource for this contract instance. The preferred resource denotes how many
+ * Gets the preferred resources for this contract instance. The preferred resources denote how many
* resources will be needed in the maximum for the user function during the execution.
*
* @return The preferred resource of this operator.
*/
- public ResourceSpec getPreferredResource() {
- return this.preferredResource;
+ @Nullable
+ @PublicEvolving
+ public ResourceSpec getPreferredResources() {
+ return this.preferredResources;
}
/**
* Sets the minimum and preferred resources for this contract instance. The resource denotes
* how many memories and cpu cores of the user function will be consumed during the execution.
*
- * @param minResource The minimum resource of this operator.
- * @param preferredResource The preferred resource of this operator.
+ * @param minResources The minimum resource of this operator.
+ * @param preferredResources The preferred resource of this operator.
*/
- public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- this.minResource = minResource;
- this.preferredResource = preferredResource;
+ @PublicEvolving
+ public void setResource(ResourceSpec minResources, ResourceSpec preferredResources) {
+ this.minResources = minResources;
+ this.preferredResources = preferredResources;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 1387508..0ea289a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -105,14 +105,12 @@ public class ResourceSpec implements Serializable {
* @return The new resource with merged values.
*/
public ResourceSpec merge(ResourceSpec other) {
- ResourceSpec result = new ResourceSpec(
+ return new ResourceSpec(
Math.max(this.cpuCores, other.cpuCores),
this.heapMemoryInMB + other.heapMemoryInMB,
this.directMemoryInMB + other.directMemoryInMB,
this.nativeMemoryInMB + other.nativeMemoryInMB,
this.stateSizeInMB + other.stateSizeInMB);
-
- return result;
}
public double getCpuCores() {
@@ -141,12 +139,8 @@ public class ResourceSpec implements Serializable {
* @return True if all the values are equal or greater than 0, otherwise false.
*/
public boolean isValid() {
- if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
- this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) {
- return true;
- } else {
- return false;
- }
+ return (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 &&
+ this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0);
}
/**
@@ -162,11 +156,7 @@ public class ResourceSpec implements Serializable {
int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB);
int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB);
int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB);
- if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) {
- return true;
- } else {
- return false;
- }
+ return (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0);
}
@Override
@@ -186,6 +176,17 @@ public class ResourceSpec implements Serializable {
}
@Override
+ public int hashCode() {
+ final long cpuBits = Double.doubleToLongBits(cpuCores);
+ int result = (int) (cpuBits ^ (cpuBits >>> 32));
+ result = 31 * result + heapMemoryInMB;
+ result = 31 * result + directMemoryInMB;
+ result = 31 * result + nativeMemoryInMB;
+ result = 31 * result + stateSizeInMB;
+ return result;
+ }
+
+ @Override
public String toString() {
return "ResourceSpec{" +
"cpuCores=" + cpuCores +
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 3be9cc0..369e013 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -52,9 +52,9 @@ public class DataSink<T> {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
- private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+ private ResourceSpec minResources = ResourceSpec.UNKNOWN;
- private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+ private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
private Configuration parameters;
@@ -285,61 +285,70 @@ public class DataSink<T> {
}
/**
- * Returns the minimum resource of this data sink. If no minimum resource has been set,
- * it returns the default empty resource.
+ * Returns the minimum resources of this data sink. If no minimum resources have been set,
+ * this returns the default resource profile.
*
- * @return The minimum resource of this data sink.
+ * @return The minimum resources of this data sink.
*/
- public ResourceSpec getMinResource() {
- return this.minResource;
+ @PublicEvolving
+ public ResourceSpec getMinResources() {
+ return this.minResources;
}
/**
- * Returns the preferred resource of this data sink. If no preferred resource has been set,
- * it returns the default empty resource.
+ * Returns the preferred resources of this data sink. If no preferred resources have been set,
+ * this returns the default resource profile.
*
- * @return The preferred resource of this data sink.
+ * @return The preferred resources of this data sink.
*/
- public ResourceSpec getPreferredResource() {
- return this.preferredResource;
+ @PublicEvolving
+ public ResourceSpec getPreferredResources() {
+ return this.preferredResources;
}
- /**
- * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
- * The minimum resource must be satisfied and the preferred resource specifies the upper bound
- * for dynamic resource resize.
- *
- * @param minResource The minimum resource for this data sink.
- * @param preferredResource The preferred resource for this data sink.
- * @return The data sink with set minimum and preferred resources.
- */
- /*
- public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- Preconditions.checkNotNull(minResource != null && preferredResource != null,
- "The min and preferred resources must be not null.");
- Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
- "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
- this.minResource = minResource;
- this.preferredResource = preferredResource;
-
- return this;
- }*/
-
- /**
- * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources.
- *
- * @param resource The resource for this data sink.
- * @return The data sink with set minimum and preferred resources.
- */
- /*
- public DataSink<T> setResource(ResourceSpec resource) {
- Preconditions.checkNotNull(resource != null, "The resource must be not null.");
- Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
- this.minResource = resource;
- this.preferredResource = resource;
-
- return this;
- }*/
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//
+// /**
+// * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
+// * The minimum resource must be satisfied and the preferred resource specifies the upper bound
+// * for dynamic resource resize.
+// *
+// * @param minResources The minimum resource for this data sink.
+// * @param preferredResources The preferred resource for this data sink.
+// * @return The data sink with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public DataSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+// Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+// Preconditions.checkArgument(minResources.isValid() &&
+// preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+// "The values in resource must be not less than 0 and the preferred " +
+// "resource must be greater than the min resource.");
+//
+// this.minResources = minResources;
+// this.preferredResources = preferredResources;
+//
+// return this;
+// }
+//
+// /**
+// * Sets the resources for this data sink. This overrides the default resource profile.
+// *
+// * @param resources The resources for this data sink.
+// * @return The data sink with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public DataSink<T> setResources(ResourceSpec resources) {
+// Preconditions.checkNotNull(resources, "The resources must be not null.");
+// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+// this.minResources = resources;
+// this.preferredResources = resources;
+//
+// return this;
+// }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index cf0a63e..3d327e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -64,9 +64,9 @@ public class DeltaIteration<ST, WT> {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
- private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+ private ResourceSpec minResources = ResourceSpec.UNKNOWN;
- private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+ private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
private boolean solutionSetUnManaged;
@@ -197,65 +197,72 @@ public class DeltaIteration<ST, WT> {
return parallelism;
}
- /**
- * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
- * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
- *
- * @param minResource The minimum resource for the iteration.
- * @param preferredResource The preferred resource for the iteration.
- * @return The iteration with set minimum and preferred resources.
- */
- /*
- public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- Preconditions.checkNotNull(minResource != null && preferredResource != null,
- "The min and preferred resources must be not null.");
- Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
- "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
- this.minResource = minResource;
- this.preferredResource = preferredResource;
-
- return this;
- }*/
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//
+// /**
+// * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
+// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+// *
+// * @param minResources The minimum resource for the iteration.
+// * @param preferredResources The preferred resource for the iteration.
+// * @return The iteration with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public DeltaIteration<ST, WT> setResource(ResourceSpec minResources, ResourceSpec preferredResources) {
+// Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+// "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources.");
+//
+// this.minResources = minResources;
+// this.preferredResources = preferredResources;
+//
+// return this;
+// }
+//
+// /**
+// * Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
+// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+// *
+// * @param resources The resource for the iteration.
+// * @return The iteration with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public DeltaIteration<ST, WT> setResource(ResourceSpec resources) {
+// Preconditions.checkNotNull(resources, "The resources must be not null.");
+// Preconditions.checkArgument(resources.isValid(), "The values in resource must be not less than 0.");
+//
+// this.minResources = resources;
+// this.preferredResources = resources;
+//
+// return this;
+// }
/**
- * Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
- * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
- *
- * @param resource The resource for the iteration.
- * @return The iteration with set minimum and preferred resources.
- */
- /*
- public DeltaIteration<ST, WT> setResource(ResourceSpec resource) {
- Preconditions.checkNotNull(resource != null, "The resource must be not null.");
- Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0.");
-
- this.minResource = resource;
- this.preferredResource = resource;
-
- return this;
- }*/
-
- /**
- * Gets the minimum resource from this iteration. If no minimum resource has been set,
+ * Gets the minimum resources from this iteration. If no minimum resources have been set,
* it returns the default empty resource.
*
- * @return The minimum resource of the iteration.
+ * @return The minimum resources of the iteration.
*/
- public ResourceSpec getMinResource() {
- return this.minResource;
+ @PublicEvolving
+ public ResourceSpec getMinResources() {
+ return this.minResources;
}
/**
- * Gets the preferred resource from this iteration. If no preferred resource has been set,
+ * Gets the preferred resources from this iteration. If no preferred resources have been set,
* it returns the default empty resource.
*
- * @return The preferred resource of the iteration.
+ * @return The preferred resources of the iteration.
*/
- public ResourceSpec getPreferredResource() {
- return this.preferredResource;
+ @PublicEvolving
+ public ResourceSpec getPreferredResources() {
+ return this.preferredResources;
}
-
+
/**
* Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the
* iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step,
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 79cae14..6ae59dd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -39,9 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
- protected ResourceSpec minResource = ResourceSpec.UNKNOWN;
+ protected ResourceSpec minResources = ResourceSpec.UNKNOWN;
+
+ protected ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
- protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
super(context, resultType);
@@ -81,8 +82,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
*
* @return The minimum resource of this operator.
*/
- public ResourceSpec minResource() {
- return this.minResource;
+ public ResourceSpec getMinResources() {
+ return this.minResources;
}
/**
@@ -91,8 +92,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
*
* @return The preferred resource of this operator.
*/
- public ResourceSpec preferredResource() {
- return this.preferredResource;
+ public ResourceSpec getPreferredResources() {
+ return this.preferredResources;
}
/**
@@ -129,45 +130,51 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
return returnType;
}
- /**
- * Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
- * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
- *
- * @param minResource The minimum resource for this operator.
- * @param preferredResource The preferred resource for this operator.
- * @return The operator with set minimum and preferred resources.
- */
- /*
- public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- Preconditions.checkNotNull(minResource != null && preferredResource != null,
- "The min and preferred resources must be not null.");
- Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
- "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
- this.minResource = minResource;
- this.preferredResource = preferredResource;
-
- @SuppressWarnings("unchecked")
- O returnType = (O) this;
- return returnType;
- }*/
-
- /**
- * Sets the resource for this operator. This overrides the default empty minimum and preferred resources.
- *
- * @param resource The resource for this operator.
- * @return The operator with set minimum and preferred resources.
- */
- /*
- public O setResource(ResourceSpec resource) {
- Preconditions.checkNotNull(resource != null, "The resource must be not null.");
- Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
- this.minResource = resource;
- this.preferredResource = resource;
-
- @SuppressWarnings("unchecked")
- O returnType = (O) this;
- return returnType;
- }*/
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//
+// /**
+// * Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
+// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
+// *
+// * @param minResources The minimum resource for this operator.
+// * @param preferredResources The preferred resource for this operator.
+// * @return The operator with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public O setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+// Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+//
+// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+//
+// this.minResources = minResources;
+// this.preferredResources = preferredResources;
+//
+// @SuppressWarnings("unchecked")
+// O returnType = (O) this;
+// return returnType;
+// }
+//
+// /**
+// * Sets the resources for this operator. This overrides the default minimum and preferred resources.
+// *
+// * @param resources The resource for this operator.
+// * @return The operator with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public O setResources(ResourceSpec resources) {
+// Preconditions.checkNotNull(resources, "The resource must be not null.");
+// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+// this.minResources = resources;
+// this.preferredResources = resources;
+//
+// @SuppressWarnings("unchecked")
+// O returnType = (O) this;
+// return returnType;
+// }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 909cd32..3bffd8b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -64,7 +64,7 @@ public class OperatorTranslation {
// translate the sink itself and connect it to the input
GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
- translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource());
+ translatedSink.setResource(sink.getMinResources(), sink.getPreferredResources());
return translatedSink;
}
@@ -95,29 +95,29 @@ public class OperatorTranslation {
if (dataSet instanceof DataSource) {
DataSource<T> dataSource = (DataSource<T>) dataSet;
dataFlowOp = dataSource.translateToDataFlow();
- dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource());
+ dataFlowOp.setResource(dataSource.getMinResources(), dataSource.getPreferredResources());
}
else if (dataSet instanceof SingleInputOperator) {
SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
dataFlowOp = translateSingleInputOperator(singleInputOperator);
- dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource());
+ dataFlowOp.setResource(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
}
else if (dataSet instanceof TwoInputOperator) {
TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
dataFlowOp = translateTwoInputOperator(twoInputOperator);
- dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource());
+ dataFlowOp.setResource(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
}
else if (dataSet instanceof BulkIterationResultSet) {
- BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
+ BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
dataFlowOp = translateBulkIteration(bulkIterationResultSet);
- dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(),
- bulkIterationResultSet.getIterationHead().preferredResource());
+ dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResources(),
+ bulkIterationResultSet.getIterationHead().getPreferredResources());
}
else if (dataSet instanceof DeltaIterationResultSet) {
- DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
+ DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
- dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(),
- deltaIterationResultSet.getIterationHead().getPreferredResource());
+ dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResources(),
+ deltaIterationResultSet.getIterationHead().getPreferredResources());
}
else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 4ef91b3..723c532 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -310,12 +310,12 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
return this.parallelism;
}
- public ResourceSpec getMinResource() {
- return this.template.getOperator().getMinResource();
+ public ResourceSpec getMinResources() {
+ return this.template.getOperator().getMinResources();
}
- public ResourceSpec getPreferredResource() {
- return this.template.getOperator().getPreferredResource();
+ public ResourceSpec getPreferredResources() {
+ return this.template.getOperator().getPreferredResources();
}
public long getGuaranteedAvailableMemory() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5cfb601..bfe7567 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -177,48 +177,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
"parallelism.")
}
- /**
- * Sets the minimum and preferred resources of this operation.
- */
- /*
- def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = {
- javaSet match {
- case ds: DataSource[_] => ds.setResource(minResource, preferredResource)
- case op: Operator[_, _] => op.setResource(minResource, preferredResource)
- case di: DeltaIterationResultSet[_, _] =>
- di.getIterationHead.setResource(minResource, preferredResource)
- case _ =>
- throw new UnsupportedOperationException("Operator does not support " +
- "configuring custom resources specs.")
- }
- this
- }*/
- /**
- * Sets the resource of this operation.
- */
- /*
- def resource(resource: ResourceSpec) : Unit = {
- this.resource(resource, resource)
- }*/
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+// /**
+// * Sets the minimum and preferred resources of this operation.
+// */
+// @PublicEvolving
+// def resources(minResources: ResourceSpec, preferredResources: ResourceSpec) : Unit = {
+// javaSet match {
+// case ds: DataSource[_] => ds.setResources(minResources, preferredResources)
+// case op: Operator[_, _] => op.setResources(minResources, preferredResources)
+// case di: DeltaIterationResultSet[_, _] =>
+// di.getIterationHead.setResources(minResources, preferredResources)
+// case _ =>
+// throw new UnsupportedOperationException("Operator does not support " +
+// "configuring custom resources specs.")
+// }
+// this
+// }
+//
+// /**
+// * Sets the resource of this operation.
+// */
+// @PublicEvolving
+// def resources(resources: ResourceSpec) : Unit = {
+// this.resources(resources, resources)
+// }
/**
- * Returns the minimum resource of this operation.
+ * Returns the minimum resources of this operation.
*/
- def minResource: ResourceSpec = javaSet match {
- case ds: DataSource[_] => ds.minResource()
- case op: Operator[_, _] => op.minResource
+ @PublicEvolving
+ def minResources: ResourceSpec = javaSet match {
+ case ds: DataSource[_] => ds.getMinResources()
+ case op: Operator[_, _] => op.getMinResources()
case _ =>
throw new UnsupportedOperationException("Operator does not support " +
"configuring custom resources specs.")
}
/**
- * Returns the preferred resource of this operation.
+ * Returns the preferred resources of this operation.
*/
- def preferredResource: ResourceSpec = javaSet match {
- case ds: DataSource[_] => ds.preferredResource()
- case op: Operator[_, _] => op.preferredResource
+ @PublicEvolving
+ def preferredResources: ResourceSpec = javaSet match {
+ case ds: DataSource[_] => ds.getPreferredResources()
+ case op: Operator[_, _] => op.getPreferredResources()
case _ =>
throw new UnsupportedOperationException("Operator does not support " +
"configuring custom resources specs.")
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ae1c39a..c443758 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -144,21 +144,23 @@ public class DataStream<T> {
}
/**
- * Gets the minimum resource for this operator.
+ * Gets the minimum resources for this operator.
*
- * @return The minimum resource set for this operator.
+ * @return The minimum resources set for this operator.
*/
- public ResourceSpec minResource() {
- return transformation.getMinResource();
+ @PublicEvolving
+ public ResourceSpec getMinResources() {
+ return transformation.getMinResources();
}
/**
- * Gets the preferred resource for this operator.
+ * Gets the preferred resources for this operator.
*
- * @return The preferred resource set for this operator.
+ * @return The preferred resources set for this operator.
*/
- public ResourceSpec preferredResource() {
- return transformation.getPreferredResource();
+ @PublicEvolving
+ public ResourceSpec getPreferredResources() {
+ return transformation.getPreferredResources();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 69e21d6..39d81c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation;
@Public
public class DataStreamSink<T> {
- SinkTransformation<T> transformation;
+ private final SinkTransformation<T> transformation;
@SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
@@ -113,41 +113,45 @@ public class DataStreamSink<T> {
return this;
}
- /**
- * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
- * be considered in resource resize feature for future plan.
- *
- * @param minResource The minimum resource for this sink.
- * @param preferredResource The preferred resource for this sink
- * @return The sink with set minimum and preferred resources.
- */
- /*
- public DataStreamSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- Preconditions.checkNotNull(minResource != null && preferredResource != null,
- "The min and preferred resources must be not null.");
- Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
- "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
- transformation.setResource(minResource, preferredResource);
-
- return this;
- }*/
-
- /**
- * Sets the resource for this sink, the minimum and preferred resources are the same by default.
- *
- * @param resource The resource for this sink.
- * @return The sink with set minimum and preferred resources.
- */
- /*
- public DataStreamSink<T> setResource(ResourceSpec resource) {
- Preconditions.checkNotNull(resource != null, "The resource must be not null.");
- Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
- transformation.setResource(resource, resource);
-
- return this;
- }*/
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+// /**
+// * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
+// * be considered in resource resize feature for future plan.
+// *
+// * @param minResources The minimum resources for this sink.
+// * @param preferredResources The preferred resources for this sink
+// * @return The sink with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public DataStreamSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+// Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources),
+// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+//
+// transformation.setResources(minResources, preferredResources);
+//
+// return this;
+// }
+//
+// /**
+// * Sets the resource for this sink, the minimum and preferred resources are the same by default.
+// *
+// * @param resources The resource for this sink.
+// * @return The sink with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public DataStreamSink<T> setResources(ResourceSpec resources) {
+// Preconditions.checkNotNull(resources, "The resource must be not null.");
+// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+// transformation.setResources(resources, resources);
+//
+// return this;
+// }
/**
* Turns off chaining for this operator so thread co-location will not be
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index d856603..859c6d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -154,41 +154,45 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
return this;
}
- /**
- * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will
- * be considered in dynamic resource resize feature for future plan.
- *
- * @param minResource The minimum resource for this operator.
- * @param preferredResource The preferred resource for this operator.
- * @return The operator with set minimum and preferred resources.
- */
- /*
- public SingleOutputStreamOperator<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- Preconditions.checkArgument(minResource != null && preferredResource != null,
- "The min and preferred resources must be not null.");
- Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
- "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
-
- transformation.setResource(minResource, preferredResource);
-
- return this;
- }*/
-
- /**
- * Sets the resource for this operator, the minimum and preferred resources are the same by default.
- *
- * @param resource The resource for this operator.
- * @return The operator with set minimum and preferred resources.
- */
- /*
- public SingleOutputStreamOperator<T> setResource(ResourceSpec resource) {
- Preconditions.checkNotNull(resource != null, "The resource must be not null.");
- Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");
-
- transformation.setResource(resource, resource);
-
- return this;
- }*/
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+// /**
+// * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will
+// * be considered in dynamic resource resize feature for future plan.
+// *
+// * @param minResources The minimum resources for this operator.
+// * @param preferredResources The preferred resources for this operator.
+// * @return The operator with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public SingleOutputStreamOperator<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+// Preconditions.checkNotNull(minResources, "The min resources must be not null.");
+// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null.");
+// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResource.lessThanOrEqual(preferredResources),
+// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");
+//
+// transformation.setResources(minResources, preferredResources);
+//
+// return this;
+// }
+//
+// /**
+// * Sets the resources for this operator, the minimum and preferred resources are the same by default.
+// *
+// * @param resources The resources for this operator.
+// * @return The operator with set minimum and preferred resources.
+// */
+// @PublicEvolving
+// public SingleOutputStreamOperator<T> setResources(ResourceSpec resources) {
+// Preconditions.checkNotNull(resources, "The resource must be not null.");
+// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0.");
+//
+// transformation.setResources(resources, resources);
+//
+// return this;
+// }
private boolean canBeParallel() {
return !nonParallel;
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index fcbc607..a87e63d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -416,7 +416,7 @@ public class StreamGraph extends StreamingPlan {
public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) {
if (getStreamNode(vertexID) != null) {
- getStreamNode(vertexID).setResource(minResource, preferredResource);
+ getStreamNode(vertexID).setResources(minResource, preferredResource);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index af92421..f55ff47 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -202,8 +202,8 @@ public class StreamGraphGenerator {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
- if (transform.getMinResource() != null && transform.getPreferredResource() != null) {
- streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource());
+ if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
+ streamGraph.setResource(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0bf9adf..2d2e1e75 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -49,8 +49,8 @@ public class StreamNode implements Serializable {
* dynamic scaling and the number of key groups used for partitioned state.
*/
private int maxParallelism;
- private ResourceSpec minResource;
- private ResourceSpec preferredResource;
+ private ResourceSpec minResources;
+ private ResourceSpec preferredResources;
private Long bufferTimeout = null;
private final String operatorName;
private String slotSharingGroup;
@@ -168,17 +168,17 @@ public class StreamNode implements Serializable {
this.maxParallelism = maxParallelism;
}
- public ResourceSpec getMinResource() {
- return minResource;
+ public ResourceSpec getMinResources() {
+ return minResources;
}
- public ResourceSpec getPreferredResource() {
- return preferredResource;
+ public ResourceSpec getPreferredResources() {
+ return preferredResources;
}
- public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- this.minResource = minResource;
- this.preferredResource = preferredResource;
+ public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+ this.minResources = minResources;
+ this.preferredResources = preferredResources;
}
public Long getBufferTimeout() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1d22454..24b5736 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.Preconditions;
import java.util.Collection;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@code StreamTransformation} represents the operation that creates a
* {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
@@ -127,16 +129,16 @@ public abstract class StreamTransformation<T> {
private int maxParallelism = -1;
/**
- * The minimum resource for this stream transformation. It defines the lower limit for
- * dynamic resource resize in future plan.
+ * The minimum resources for this stream transformation. It defines the lower limit for
+ * dynamic resources resize in future plan.
*/
- private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+ private ResourceSpec minResources = ResourceSpec.UNKNOWN;
/**
- * The preferred resource for this stream transformation. It defines the upper limit for
+ * The preferred resources for this stream transformation. It defines the upper limit for
* dynamic resource resize in future plan.
*/
- private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+ private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
/**
* User-specified ID for this transformation. This is used to assign the
@@ -229,12 +231,12 @@ public abstract class StreamTransformation<T> {
/**
* Sets the minimum and preferred resources for this stream transformation.
*
- * @param minResource The minimum resource of this transformation.
- * @param preferredResource The preferred resource of this transformation.
+ * @param minResources The minimum resource of this transformation.
+ * @param preferredResources The preferred resource of this transformation.
*/
- public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
- this.minResource = minResource;
- this.preferredResource = preferredResource;
+ public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
+ this.minResources = checkNotNull(minResources);
+ this.preferredResources = checkNotNull(preferredResources);
}
/**
@@ -242,8 +244,8 @@ public abstract class StreamTransformation<T> {
*
* @return The minimum resource of this transformation.
*/
- public ResourceSpec getMinResource() {
- return minResource;
+ public ResourceSpec getMinResources() {
+ return minResources;
}
/**
@@ -251,8 +253,8 @@ public abstract class StreamTransformation<T> {
*
* @return The preferred resource of this transformation.
*/
- public ResourceSpec getPreferredResource() {
- return preferredResource;
+ public ResourceSpec getPreferredResources() {
+ return preferredResources;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index e42fb3f..35e1f23 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -146,36 +146,42 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
- * Returns the minimum resource of this operation.
+ * Returns the minimum resources of this operation.
*/
- def minResource: ResourceSpec = stream.minResource()
-
- /**
- * Returns the preferred resource of this operation.
- */
- def preferredResource: ResourceSpec = stream.preferredResource()
-
- /**
- * Sets the minimum and preferred resources of this operation.
- */
- /*
- def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] =
- stream match {
- case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource(
- minResource, preferredResource))
- case _ =>
- throw new UnsupportedOperationException("Operator does not support " +
- "configuring custom resources specs.")
- this
- }*/
+ @PublicEvolving
+ def minResources: ResourceSpec = stream.getMinResources()
/**
- * Sets the resource of this operation.
+ * Returns the preferred resources of this operation.
*/
- /*
- def resource(resource: ResourceSpec) : Unit = {
- this.resource(resource, resource)
- }*/
+ @PublicEvolving
+ def preferredResources: ResourceSpec = stream.getPreferredResources()
+
+// ---------------------------------------------------------------------------
+// Fine-grained resource profiles are an incomplete work-in-progress feature
+// The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+// /**
+// * Sets the minimum and preferred resources of this operation.
+// */
+// @PublicEvolving
+// def resources(minResources: ResourceSpec, preferredResources: ResourceSpec) : DataStream[T] =
+// stream match {
+// case stream : SingleOutputStreamOperator[T] => asScalaStream(
+// stream.setResources(minResources, preferredResources))
+// case _ =>
+// throw new UnsupportedOperationException("Operator does not support " +
+// "configuring custom resources specs.")
+// this
+// }
+//
+// /**
+// * Sets the resource of this operation.
+// */
+// @PublicEvolving
+// def resources(resources: ResourceSpec) : Unit = {
+// this.resources(resources, resources)
+// }
/**
* Gets the name of the current data stream. This name is
http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
index 907ad9f..7abb392 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
@@ -41,21 +41,6 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger {
protected def isExcludedByName(method: Method): Boolean
/**
- * Determines whether a method is excluded by an interface it uses.
- */
- protected def isExcludedByInterface(method: Method): Boolean = {
- val excludedInterfaces =
- Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
- def toComparisionKey(method: Method) =
- (method.getReturnType, method.getName, method.getGenericReturnType)
- val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
- excludedInterfaces.contains(i.getName)
- }
- val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
- excludedMethods.contains(toComparisionKey(method))
- }
-
- /**
* Utility to be called during the test.
*/
protected def checkMethods(
@@ -66,26 +51,33 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger {
val javaMethods = javaClass.getMethods
.filterNot(_.isAccessible)
.filterNot(isExcludedByName)
- .filterNot(isExcludedByInterface)
.map(m => m.getName).toSet
val scalaMethods = scalaClass.getMethods
.filterNot(_.isAccessible)
.filterNot(isExcludedByName)
- .filterNot(isExcludedByInterface)
.map(m => m.getName).toSet
val missingMethods = javaMethods -- scalaMethods
- for (method <- missingMethods) {
- fail("Method " + method + " from " + javaClass + " is missing from " + scalaClassName + ".")
+ for (javaMethod <- missingMethods) {
+ // check if the method simply follows different getter / setter conventions in Scala / Java
+ // for example Java: getFoo() should match Scala: foo()
+ if (!containsScalaGetterLike(javaMethod, scalaMethods)) {
+ fail(s"Method $javaMethod from $javaClass is missing from $scalaClassName.")
+ }
}
}
- protected def checkEquality(scalaInstance: AnyRef, extractJavaFun : ((AnyRef) => AnyRef)) {
- val javaInstance = extractJavaFun(scalaInstance)
+ protected def containsScalaGetterLike(javaMethod: String, scalaMethods: Set[String]): Boolean = {
+ if (javaMethod.startsWith("get") && javaMethod.length >= 4) {
+ val scalaMethodName = Character.toLowerCase(javaMethod.charAt(3)) + javaMethod.substring(4)
+ scalaMethods.contains(scalaMethodName)
+ } else {
+ false
+ }
}
-
+
/**
* Tests to be performed to ensure API completeness.
*/