You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ericxiao251 (via GitHub)" <gi...@apache.org> on 2023/04/20 20:13:38 UTC

[GitHub] [flink] ericxiao251 opened a new pull request, #22438: Add method setMaxParallelism to DataStreamSink.

ericxiao251 opened a new pull request, #22438:
URL: https://github.com/apache/flink/pull/22438

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   When turning on Flink reactive mode, it is suggested to convert all `setParallelism` calls to `setMaxParallelism`, [docs](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/elastic_scaling/#configuration).
   
   With the current implementation of the `DataStreamSink`, only the [`setParallelism` function](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java#L172-L181) of the [`Transformation` class is exposed - `Transformation` also has the `setMaxParallelism` function which is not exposed](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L248-L285).
   
   This PR exposes the `setMaxParallelism` function on any `DataStreamSink` objects in a Flink pipeline, which would be helpful to not only reactive mode, but also to the [vertex auto scaler](https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling) and non elastic scaling mode.
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on pull request #22438: [FLINK-31632] Add method setMaxParallelism to DataStreamSink.

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1522698089

   Oh, I notice that the jira id in PR title and  commit message is wrong, it should be [FLINK-31873](https://issues.apache.org/jira/browse/FLINK-31873)
   
   And I think we don't need separate commits in this PR, we prefer each commit to contain full functionality


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa merged pull request #22438: [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa merged PR #22438:
URL: https://github.com/apache/flink/pull/22438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #22438: [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1526941787

   Merge after CI green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1526919022

   > Please put the hotfix commit at the first.
   
   Done :).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1524532676

   > @ericxiao251 Thanks for the quickly update. Ci is failed with some codestyle problem. You can use "mvn spotless:apply" to fix these violations. And you can also use IDE to automatically reformat code, see https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/.
   
   Ah thank you! I have made the private package and style changing - I will let CI run overnight (for me).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1178564640


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {

Review Comment:
   Junit 4 requires all test methods to be public. We are currently migrating to Junit5, which does not have this requirement. And then to keep visibility to a minimum, it's better to make test methods package-private. 
   There are 5 test methods that are public, others are package-private. I guess it may be that the visibility was not noticed when these test methods were introduced. 
   
   Anyway, it's better to keep the same style in test cases. Would you mind changing them to package-private in a separate hotfix commit in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1179231832


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java:
##########
@@ -180,6 +180,18 @@ public DataStreamSink<T> setParallelism(int parallelism) {
         return this;
     }
 
+    /**
+     * Sets the max parallelism for this sink. The degree must be higher than zero and less than the
+     * upper bound.

Review Comment:
   I am calling `Transformation.setMaxParallelism` [src](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L277-L285) which does that validation calls under the hood.
   
   But I can add to the `DataStream.setMaxParallelism` as well to be explicit - I noticed that is what `SingleOutputStreamOperator.setMaxParallelism` does, [src](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L145-L160).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1177975477


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        /* The max parallelism of the environment (that is inherited by the source)
+        and the parallelism of the map operator needs to be different for this test */
+        env.setMaxParallelism(4);
+        env.fromSequence(1L, 3L).map(i -> i).setMaxParallelism(10).print().setMaxParallelism(20);

Review Comment:
   Oh funny, DataStreamSink doesn't have `getId()` as well, I can add that to this PR or create a new issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22438: Add method setMaxParallelism to DataStreamSink.

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1516899223

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a2a1ef88d6e3ca2010610c8aa99d0b6d0831008e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a2a1ef88d6e3ca2010610c8aa99d0b6d0831008e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a2a1ef88d6e3ca2010610c8aa99d0b6d0831008e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #22438: Add method setMaxParallelism to DataStreamSink.

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1522085769

   @ericxiao251 Could you change your commit messages to fit with https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/#4-commit-naming-conventions ? That will also auto-link the Jira ticket and your PR together


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on a diff in pull request #22438: [FLINK-31632] Add method setMaxParallelism to DataStreamSink.

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1177277341


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {

Review Comment:
   All test methods in junit5 should be package-private.
   ```java
   void testTransformationSetMaxParallelism() 
   ``` 
   
   



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        /* The max parallelism of the environment (that is inherited by the source)
+        and the parallelism of the map operator needs to be different for this test */
+        env.setMaxParallelism(4);
+        env.fromSequence(1L, 3L).map(i -> i).setMaxParallelism(10).print().setMaxParallelism(20);

Review Comment:
   It's better to introduce variables for each DataStream/DataStreamSink and then get StreamNode by their ID.
   
   ```java
   DataStreamSource<Long> source = env.fromSequence(1L, 3L);
   assertThat(streamGraph.getStreamNode(source.getId()).getMaxParallelism()).isEqualTo(4);
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1179247666


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,27 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        /* The max parallelism of the environment (that is inherited by the source)
+        and the parallelism of the map operator needs to be different for this test */

Review Comment:
   Ah, thank you. I simply copied the `setParallelism` test, I will edit both comments to use `//` instead of `/* .. */`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1527509022

   CI passed :D.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1526147308

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873][API / DataStream] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1526922302

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1177975477


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        /* The max parallelism of the environment (that is inherited by the source)
+        and the parallelism of the map operator needs to be different for this test */
+        env.setMaxParallelism(4);
+        env.fromSequence(1L, 3L).map(i -> i).setMaxParallelism(10).print().setMaxParallelism(20);

Review Comment:
   Oh interesting, DataStreamSink doesn't have `getId()` as well. I will use `sink.getTransformation().getId()` like how we do it in `DataStreamTest`: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java#L820



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        /* The max parallelism of the environment (that is inherited by the source)
+        and the parallelism of the map operator needs to be different for this test */
+        env.setMaxParallelism(4);
+        env.fromSequence(1L, 3L).map(i -> i).setMaxParallelism(10).print().setMaxParallelism(20);

Review Comment:
   Oh interesting, DataStreamSink doesn't have `getId()`. I will use `sink.getTransformation().getId()` like how we do it in `DataStreamTest`: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java#L820



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] huwh commented on pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "huwh (via GitHub)" <gi...@apache.org>.
huwh commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1524511993

   @ericxiao251 Thanks for the quickly update. 
   Ci is failed with some codestyle problem. You can use "mvn spotless:apply" to fix these violations. And you can also use IDE to automatically reformat code, see https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "reswqa (via GitHub)" <gi...@apache.org>.
reswqa commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1178591601


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,27 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    void testTransformationSetMaxParallelism() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        /* The max parallelism of the environment (that is inherited by the source)
+        and the parallelism of the map operator needs to be different for this test */

Review Comment:
   In Flink, `//` is generally used directly for comments.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java:
##########
@@ -180,6 +180,18 @@ public DataStreamSink<T> setParallelism(int parallelism) {
         return this;
     }
 
+    /**
+     * Sets the max parallelism for this sink. The degree must be higher than zero and less than the
+     * upper bound.

Review Comment:
   > The degree must be higher than zero and less than the upper bound.
   
   Just stating this in java doc is not safe enough, we'd better have directly sanity check for this. For example:
   `OperatorValidationUtils.validateMaxParallelism(maxParallelism, yes);`



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java:
##########
@@ -180,6 +180,18 @@ public DataStreamSink<T> setParallelism(int parallelism) {
         return this;
     }
 
+    /**
+     * Sets the max parallelism for this sink. The degree must be higher than zero and less than the
+     * upper bound.

Review Comment:
   > The degree must be higher than zero and less than the upper bound.
   
   Just stating this in java doc is not safe enough, we'd better have directly sanity check for this. For example:
   `OperatorValidationUtils.validateMaxParallelism(maxParallelism, true);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1525799723

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #22438: Add method setMaxParallelism to DataStreamSink.

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1517405943

   @ericxiao251 Like I've mentioned in the Jira ticket, please open a discussion thread in the Dev mailing list first


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31632] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1522095761

   Ah right, done! Thanks for the reminder @MartijnVisser .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1177962682


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {

Review Comment:
   Hey Weihua, that's a good point I've noticed that in the other test files before I believe, but all the tests in this file are marked `public` for some reason 🤔 . Should I mark my test as `public` still?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on a diff in pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on code in PR #22438:
URL: https://github.com/apache/flink/pull/22438#discussion_r1178570242


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -283,6 +283,25 @@ public void testTransformationSetParallelism() {
         assertThat(vertices.get(2).isParallelismConfigured()).isTrue();
     }
 
+    @Test
+    public void testTransformationSetMaxParallelism() {

Review Comment:
   Sounds good, added a hotfix commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #22438: [FLINK-31873] Add method setMaxParallelism to DataStreamSink.

Posted by "ericxiao251 (via GitHub)" <gi...@apache.org>.
ericxiao251 commented on PR #22438:
URL: https://github.com/apache/flink/pull/22438#issuecomment-1523579845

   > Oh, I notice that the jira id in PR title and commit message is wrong, it should be [FLINK-31873](https://issues.apache.org/jira/browse/FLINK-31873)
   
   Ah thanks for pointing this out 😅 , must have been looking at another issue at the time as well 🙈 .
   
   > And I think we don't need separate commits in this PR, we prefer each commit to contain full functionality
   
   Thanks! I have squashed the commits into a single commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org