You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/12 15:23:59 UTC

[flink] branch master updated: [FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fb482fe3984 [FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement
fb482fe3984 is described below

commit fb482fe39844efda33a4c05858903f5b64e158a3
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Jan 3 11:49:58 2023 +0100

    [FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 10 +++++--
 .../runtime/tasks/StreamMockEnvironment.java       |  6 +++-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 34 +++++++---------------
 3 files changed, 23 insertions(+), 27 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 097be6d07f2..40ad6a1fceb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1610,7 +1610,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
         int index = 0;
         for (NonChainedOutput streamOutput : outputsInOrder) {
-            replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, streamOutput);
+            replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+                    environment, streamOutput, index);
             recordWriters.add(
                     createRecordWriter(
                             streamOutput,
@@ -1623,10 +1624,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
     }
 
     private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
-            Environment environment, NonChainedOutput streamOutput) {
+            Environment environment, NonChainedOutput streamOutput, int outputIndex) {
         if (streamOutput.getPartitioner() instanceof ForwardPartitioner
-                && streamOutput.getConsumerParallelism()
+                && environment.getWriter(outputIndex).getNumberOfSubpartitions()
                         != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            LOG.debug(
+                    "Replacing forward partitioner with rebalance for {}",
+                    environment.getTaskInfo().getTaskNameWithSubtasks());
             streamOutput.setPartitioner(new RebalancePartitioner<>());
         }
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 567af50d5b1..9d37bc16cf8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -95,7 +95,7 @@ public class StreamMockEnvironment implements Environment {
 
     private final List<IndexedInputGate> inputs;
 
-    private final List<ResultPartitionWriter> outputs;
+    private List<ResultPartitionWriter> outputs;
 
     private final JobID jobID;
 
@@ -236,6 +236,10 @@ public class StreamMockEnvironment implements Environment {
         }
     }
 
+    public void setOutputs(List<ResultPartitionWriter> outputs) {
+        this.outputs = outputs;
+    }
+
     public void setExternalExceptionHandler(Consumer<Throwable> externalExceptionHandler) {
         this.externalExceptionHandler = externalExceptionHandler;
     }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 51f3d0d2311..4219810e081 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
@@ -58,11 +57,10 @@ import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWrite
 import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.metrics.TimerGauge;
@@ -117,7 +115,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -143,7 +140,6 @@ import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -1835,24 +1831,16 @@ public class StreamTaskTest extends TestLogger {
                                     .getChannelSelector()
                             instanceof ForwardPartitioner);
 
-            // Change consumer parallelism
-            harness.streamTask.configuration.setVertexNonChainedOutputs(
-                    Arrays.asList(
-                            new NonChainedOutput(
-                                    false,
-                                    0,
-                                    // Set a different consumer parallelism to force trigger
-                                    // replacing the ForwardPartitioner
-                                    42,
-                                    100,
-                                    1000,
-                                    false,
-                                    new IntermediateDataSetID(),
-                                    new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
-                                    // Use forward partitioner
-                                    new ForwardPartitioner<>(),
-                                    ResultPartitionType.PIPELINED)));
-            harness.streamTask.configuration.serializeAllConfigs();
+            // Simulate changed downstream task parallelism (1->2)
+            List<ResultPartitionWriter> newOutputs = new ArrayList<>();
+            newOutputs.add(
+                    new MockResultPartitionWriter() {
+                        @Override
+                        public int getNumberOfSubpartitions() {
+                            return 2;
+                        }
+                    });
+            harness.streamMockEnvironment.setOutputs(newOutputs);
 
             // Re-create outputs
             recordWriterDelegate =