You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/02 13:40:10 UTC

[GitHub] [flink] mxm opened a new pull request, #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

mxm opened a new pull request, #21443:
URL: https://github.com/apache/flink/pull/21443

   ## What is the purpose of the change
   
   In case of parallelism changes to the JobGraph, as done via the AdaptiveScheduler or through providing JobVertexId overrides in PipelineOptions#PARALLELISM_OVERRIDES, the inner serialized PartitionStrategy of a StreamTask may not be suitable anymore.
   
   This is the case for the ForwardPartitioner strategy which uses a fixed local channel for transmitting data. Whenever the consumer parallelism doesn't match the local parallelism, we should be replacing it with the RebalancePartitioner.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
mxm commented on PR #21443:
URL: https://github.com/apache/flink/pull/21443#issuecomment-1359251921

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1049789012


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##########
@@ -1799,6 +1811,64 @@ public void testMailboxMetricsMeasurement() throws Exception {
         }
     }
 
+    @Test
+    public void testForwardPartitionerIsConvertedToRebalanceOnParallelismChanges()
+            throws Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setOutputPartitioner(new ForwardPartitioner<>())
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator());
+
+        try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+
+            RecordWriterDelegate<SerializationDelegate<StreamRecord<Object>>> recordWriterDelegate =
+                    harness.streamTask.createRecordWriterDelegate(
+                            harness.streamTask.configuration, harness.streamMockEnvironment);
+            // Prerequisite: We are using the ForwardPartitioner
+            assertTrue(
+                    ((ChannelSelectorRecordWriter)
+                                            ((SingleRecordWriter) recordWriterDelegate)
+                                                    .getRecordWriter(0))
+                                    .getChannelSelector()
+                            instanceof ForwardPartitioner);
+
+            // Change consumer parallelism
+            harness.streamTask.configuration.setVertexNonChainedOutputs(
+                    Arrays.asList(
+                            new NonChainedOutput(
+                                    false,
+                                    0,
+                                    // Set a different consumer parallelism to force trigger
+                                    // replacing the ForwardPartitioner
+                                    42,
+                                    100,
+                                    1000,
+                                    false,
+                                    new IntermediateDataSetID(),
+                                    new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
+                                    // Use forward partitioner
+                                    new ForwardPartitioner<>(),
+                                    ResultPartitionType.PIPELINED)));
+            harness.streamTask.configuration.serializeAllConfigs();
+
+            // Re-create outputs
+            recordWriterDelegate =
+                    harness.streamTask.createRecordWriterDelegate(
+                            harness.streamTask.configuration, harness.streamMockEnvironment);
+            // We should now have a RescalePartitioner to distribute the load

Review Comment:
   Hi @mxm , thanks for your update. Here is a minor comment.
   
   ```suggestion
               // We should now have a RebalancePartitioner to distribute the load
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1039758558


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1614,6 +1617,15 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
         return recordWriters;
     }
 
+    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+            Environment environment, NonChainedOutput streamOutput) {
+        if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+                && streamOutput.getConsumerParallelism()
+                        != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            streamOutput.setPartitioner(new RescalePartitioner<>());

Review Comment:
   Hi @mxm , thanks for your contribution.
   
   Should we use `RebalancePartitioner` or `RescalePartitioner`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1049868134


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##########
@@ -1799,6 +1811,64 @@ public void testMailboxMetricsMeasurement() throws Exception {
         }
     }
 
+    @Test
+    public void testForwardPartitionerIsConvertedToRebalanceOnParallelismChanges()
+            throws Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setOutputPartitioner(new ForwardPartitioner<>())
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator());
+
+        try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+
+            RecordWriterDelegate<SerializationDelegate<StreamRecord<Object>>> recordWriterDelegate =
+                    harness.streamTask.createRecordWriterDelegate(
+                            harness.streamTask.configuration, harness.streamMockEnvironment);
+            // Prerequisite: We are using the ForwardPartitioner
+            assertTrue(
+                    ((ChannelSelectorRecordWriter)
+                                            ((SingleRecordWriter) recordWriterDelegate)
+                                                    .getRecordWriter(0))
+                                    .getChannelSelector()
+                            instanceof ForwardPartitioner);
+
+            // Change consumer parallelism
+            harness.streamTask.configuration.setVertexNonChainedOutputs(
+                    Arrays.asList(
+                            new NonChainedOutput(
+                                    false,
+                                    0,
+                                    // Set a different consumer parallelism to force trigger
+                                    // replacing the ForwardPartitioner
+                                    42,
+                                    100,
+                                    1000,
+                                    false,
+                                    new IntermediateDataSetID(),
+                                    new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
+                                    // Use forward partitioner
+                                    new ForwardPartitioner<>(),
+                                    ResultPartitionType.PIPELINED)));
+            harness.streamTask.configuration.serializeAllConfigs();
+
+            // Re-create outputs
+            recordWriterDelegate =
+                    harness.streamTask.createRecordWriterDelegate(
+                            harness.streamTask.configuration, harness.streamMockEnvironment);
+            // We should now have a RescalePartitioner to distribute the load

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1043185697


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1614,6 +1617,15 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
         return recordWriters;
     }
 
+    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+            Environment environment, NonChainedOutput streamOutput) {
+        if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+                && streamOutput.getConsumerParallelism()
+                        != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            streamOutput.setPartitioner(new RescalePartitioner<>());

Review Comment:
   If flink users set the rescale manually, this problem also exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora merged pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #21443:
URL: https://github.com/apache/flink/pull/21443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1042409667


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1614,6 +1617,15 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
         return recordWriters;
     }
 
+    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+            Environment environment, NonChainedOutput streamOutput) {
+        if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+                && streamOutput.getConsumerParallelism()
+                        != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            streamOutput.setPartitioner(new RescalePartitioner<>());

Review Comment:
   Not sure, I think they essentially achieve the same thing. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1053234583


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##########
@@ -122,6 +130,9 @@
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;

Review Comment:
   Thanks @1996fanrui!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1049729462


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1614,6 +1617,15 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
         return recordWriters;
     }
 
+    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+            Environment environment, NonChainedOutput streamOutput) {
+        if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+                && streamOutput.getConsumerParallelism()
+                        != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            streamOutput.setPartitioner(new RescalePartitioner<>());

Review Comment:
   Thanks @1996fanrui. It makes sense to use RebalancePartitioner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1042919428


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1614,6 +1617,15 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
         return recordWriters;
     }
 
+    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+            Environment environment, NonChainedOutput streamOutput) {
+        if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+                && streamOutput.getConsumerParallelism()
+                        != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            streamOutput.setPartitioner(new RescalePartitioner<>());

Review Comment:
   Thanks for your feedback. In most scenarios, `RescalePartitioner` works well. However, I'm not sure if it's generic.
   
   For example, a job has 2 vertexes, the parallelism of them are 200, the partitioner is `ForwardPartitioner`. Autoscaling found that the performance of vertex2 is poor, so increase the parallelism of vertex2 from 200 to 300, and the parallelism of vertex1 doesn't be changed.
   
   If using the `RescalePartitioner`, there are 100 subtasks of vertex1 will send data to one downstream subtask, and the rest 100 subtasks of vertex1 will send data to 2 downstream subtasks.
   
   The first half of the source may still have lag.
   
   Not sure if it will happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21443:
URL: https://github.com/apache/flink/pull/21443#issuecomment-1335250901

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d265ebca00eca3ee03f7a60e0e366f1d7a6f6879",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d265ebca00eca3ee03f7a60e0e366f1d7a6f6879",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d265ebca00eca3ee03f7a60e0e366f1d7a6f6879 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
mxm commented on PR #21443:
URL: https://github.com/apache/flink/pull/21443#issuecomment-1337554370

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #21443: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1051603792


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java:
##########
@@ -122,6 +130,9 @@
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org