You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wangzhijiang999 <gi...@git.apache.org> on 2017/02/14 04:42:11 UTC

[GitHub] flink pull request #3303: [FLINK-5133][core] Add new setResource API for Dat...

GitHub user wangzhijiang999 opened a pull request:

    https://github.com/apache/flink/pull/3303

    [FLINK-5133][core] Add new setResource API for DataStream and DataSet

    This is part of the fine-grained resource configuration.
    For **DataStream**, the **setResource** API will be setted onto **SingleOutputStreamOperator** similar with other existing properties like parallelism, name, etc.
    For **DataSet**, the **setResource** API will be setted onto **Operator** in the similar way.
    There are two parameters described with minimum **ResourceSpec** and maximum **ResourceSpec** separately in the API for considering dynamic resource resize in future improvements.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangzhijiang999/flink FLINK-5133

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3303.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3303
    
----
commit de7c4c75c9f1c80bf7e529a1ff05fa870c2df1cb
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Date:   2017-02-14T04:37:18Z

    [FLINK-5133][core] Add new setResource API for DataStream and DataSet

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102945697
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
    @@ -144,6 +145,43 @@ class DataStream[T](stream: JavaStream[T]) {
         this
       }
     
    +
    +  /**
    +   * Returns the minimum resource of this operation.
    +   */
    +  def getMinResource: ResourceSpec = stream.getMinResource()
    --- End diff --
    
    To make getters feel more like proper Scala, I would write this as
    ```scala
    def minResource: ResourceSpec = stream.getMinResource()
    ```
    
    The "get" keyword is something that Java encourages, but Scala discourages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102946762
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
    @@ -178,6 +178,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       }
     
       /**
    +   * Sets the minimum and maximum resources of this operation.
    +   */
    +  def setResource(minResource: ResourceSpec, maxResource: ResourceSpec) = {
    +    javaSet match {
    +      case ds: DataSource[_] => ds.setResource(minResource, maxResource)
    +      case op: Operator[_, _] => op.setResource(minResource, maxResource)
    +      case di: DeltaIterationResultSet[_, _] =>
    +        di.getIterationHead.setResource(minResource, maxResource)
    +      case _ =>
    +        throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
    --- End diff --
    
    I would suggest to change the error message in all places to something like "Operator <op> does not support configuring custom resources specs".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3303


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    A more general question on the resource matching: If I understand it correctly, then the resource manager will try to get the "max" resources for an operator, but potentially go down to the "min" resources if it cannot get the "max" resources.
    
    I am wondering if "max" is confusing here and we should rather call it "preferred". Calling it "max" would make users think that those are the maximum resources that the operator can handle. I think many users would set max very high, thinking that this gives Flink more freedom to find suitable resources. They probably do not expect from the name that it always tries to satisfy the "max" resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102942856
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---
    @@ -278,4 +283,60 @@ public int getParallelism() {
     
     		return this;
     	}
    +
    +	/**
    +	 * Returns the minimum resource of this data sink. If no minimum resource has been set,
    +	 * it returns the default empty resource.
    +	 *
    +	 * @return The minimum resource of this data sink.
    +	 */
    +	public ResourceSpec getMinResource() {
    +		return this.minResource;
    +	}
    +
    +	/**
    +	 * Returns the minimum resource of this data sink. If no maximum resource has been set,
    +	 * it returns the default empty resource.
    +	 *
    +	 * @return The maximum resource of this data sink.
    +	 */
    +	public ResourceSpec getMaxResource() {
    +		return this.maxResource;
    +	}
    +
    +	/**
    +	 * Sets the minimum and maximum resources for this data sink. This overrides the default empty resource.
    +	 *	The minimum resource must be satisfied and the maximum resource specifies the upper bound
    +	 * for dynamic resource resize.
    +	 *
    +	 * @param minResource The minimum resource for this data sink.
    +	 * @param maxResource The maximum resource for this data sink.
    +	 * @return The data sink with set minimum and maximum resources.
    +	 */
    +	public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec maxResource) {
    +		Preconditions.checkArgument(minResource != null && maxResource != null,
    --- End diff --
    
    Minor comment: Most places throw `NullPointerException` (checkNotNull) for null arguments and use `IllegalArgumentException` (checkArgument) for non-null but invalid parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    One thought that @tillrohrmann and me had: It is probably okay to comment out or remove the **setters** and keep the **getters**. That should help in keeping the internal code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    I have merged this to my local repository.
    There were some issues left, partly in the commented out code.
    
    In particular `checkNotNull(variable !=null) does not work, because `variable !=null` evaluates to `Boolean(true)` or Boolean(false)` and then the check is that the Boolean is not null (which is always true).
    
    I fixed that and also renamed the variables to use "resources" consistently as plural, and harmonized the Java getter style.
    Will merge to Flink master tomorrow if the tests pass...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102942657
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java ---
    @@ -45,6 +45,10 @@
     		
     	private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;  // the number of parallel instances to use
     
    +	private ResourceSpec minResource;			// the minimum resource of the contract instance. optional
    --- End diff --
    
    In the newer code changes, we started using annotations like `@Nullable` (javax.annotation.Nullable) to mark fields that may be null. We found that this helps in code readability and in automatic bug detection in IntelliJ.
    
    I would encourage you to use that as well, if this field can be null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Add new setResource API for DataStream...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    @StephanEwen , this PR includes the new API that would be visible to user, but it can not work completely because the following codes in runtime have not been submitted. In order not to confuse users, this PR would be fixed to hide the API temporarily before merging into master. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    The code here looks very good, with a few minor comments.
    
    The main problem is as you mentioned: We are adding something to the API that is not yet supported by the runtime. We have done this before and it always confused users.
    
    If this pull request is changed to "hide" the public API part (like commenting out the public getters and setters), then most of the changes are hidden. It looks like most of the internal code also depends on public getters and hence needs to be commented out as well.
    
    What do you think we should do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102946397
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
    @@ -178,6 +178,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       }
     
       /**
    +   * Sets the minimum and maximum resources of this operation.
    +   */
    +  def setResource(minResource: ResourceSpec, maxResource: ResourceSpec) = {
    +    javaSet match {
    +      case ds: DataSource[_] => ds.setResource(minResource, maxResource)
    +      case op: Operator[_, _] => op.setResource(minResource, maxResource)
    +      case di: DeltaIterationResultSet[_, _] =>
    +        di.getIterationHead.setResource(minResource, maxResource)
    +      case _ =>
    +        throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
    +          "resource.")
    +    }
    +    this
    +  }
    +
    +  /**
    +   * Sets the resource of this operation.
    +   */
    +  def setResource(resource: ResourceSpec) = {
    +    javaSet match {
    --- End diff --
    
    May be easier to just call `setResource(resource, resource)` in this function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102946206
  
    --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---
    @@ -144,6 +145,43 @@ class DataStream[T](stream: JavaStream[T]) {
         this
       }
     
    +
    +  /**
    +   * Returns the minimum resource of this operation.
    +   */
    +  def getMinResource: ResourceSpec = stream.getMinResource()
    +
    +  /**
    +   * Returns the maximum resource of this operation.
    +   */
    +  def getMaxResource: ResourceSpec = stream.getMaxResource()
    +
    +  /**
    +   * Sets the minimum and maximum resources of this operation.
    +   */
    +  def setResource(minResource: ResourceSpec, maxResource: ResourceSpec): DataStream[T] = {
    +    stream match {
    +      case ds: SingleOutputStreamOperator[T] => ds.setResource(minResource, maxResource)
    +      case _ =>
    +        throw new UnsupportedOperationException(
    +          "Operator " + stream + " cannot set the resource.")
    +    }
    +    this
    +  }
    +
    +  /**
    +   * Sets the resource of this operation.
    +   */
    +  def setResource(resource: ResourceSpec): DataStream[T] = {
    --- End diff --
    
    The more Scala-like way of writing this would be
    ```scala
    def resource_=(resource: ResourceSpec): Unit = {
        ...
    }
    ```
    That way you can write Scala code like this (and it will translate to the setter call):
    ```scala
    val stream: DataStream[MyType] = ...
    stream.resource = new Resource(a, b, c, d)
    ```
    (We have missed that in most places in Flink so far, but it may make sense to start picking up the proper Scala style for new changed)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    @StephanEwen , sorry for my carelessness of **checkNotNull**, it is a low mistake. And I passed the "clean verify" in my local machine, thank you for merging! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3303: [FLINK-5133][core] Support to set resource for ope...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3303#discussion_r102945079
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java ---
    @@ -126,6 +127,18 @@ public static int getNewNodeId() {
     	private int maxParallelism = -1;
     
     	/**
    +	 *  The minimum resource for this stream transformation. It defines the lower limit for
    +	 *  dynamic resource resize in future plan.
    +	 */
    +	private ResourceSpec minResource;
    --- End diff --
    
    Should this be initialized with `UNKNOWN` or can this field be null? To understand these kind of assumptions, the `@Nullable` annotation often helps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3303: [FLINK-5133][core] Support to set resource for operator i...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3303
  
    Hi @StephanEwen , thanks for detail reviews of this PR and I learnt a lot from your comments.
    
    I considered all your suggestions above and submitted the modifications, including:
    
    1. Comment out to remove **setters** which would be exposed to users and just keep the **getters**.
    2. Use **preferredResource** instead of **maxResource** in order not to confuse users.
    3. Use **@Nullable** annotation for some variables.
    4. Distinguish between **checkNotNull** and **checkArgument**.
    5. Modifications in scala-like way, consider the **BatchScalaAPICompletenessTest** and **StreamingScalaAPICompletenessTest**, some methods should be the same between java and scala.
    
    BTW, after this PR merge, I would submit the blocked codes related with **JobGraph** generation and runtime stack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---