You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/02 13:04:17 UTC

[flink] branch master updated (ded2df542fd -> 1c2f4eb4ba1)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from ded2df542fd [FLINK-29363] allow fully redirection in web dashboard
     new 61d6e78e1f6 [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes
     new 3b1288e136c Replace Java 11 call
     new d20533e5877 Use RebalancePartitioner
     new 41518d6b5e0 PR comment
     new 1c2f4eb4ba1 Update flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/dispatcher/Dispatcher.java       |  3 +-
 .../api/writer/ChannelSelectorRecordWriter.java    |  6 ++
 .../streaming/api/graph/NonChainedOutput.java      |  6 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 12 ++++
 .../runtime/tasks/StreamConfigChainer.java         |  8 ++-
 .../tasks/StreamTaskFinalCheckpointsTest.java      |  4 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 17 ++++--
 .../streaming/runtime/tasks/StreamTaskTest.java    | 69 ++++++++++++++++++++++
 8 files changed, 115 insertions(+), 10 deletions(-)


[flink] 05/05: Update flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1c2f4eb4ba1d1b5ad403d7991170ca16cb71df56
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Dec 20 12:54:26 2022 +0100

    Update flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
    
    Co-authored-by: Rui Fan <19...@gmail.com>
---
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java    | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 35261d449f2..38523c18431 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -132,7 +132,6 @@ import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.streaming.util.MockStreamConfig;


[flink] 03/05: Use RebalancePartitioner

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d20533e58770c276e7fc2ac0029dbf61ecf0b19e
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Dec 15 15:46:15 2022 +0100

    Use RebalancePartitioner
---
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java     | 4 ++--
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 3 ++-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 16666b71804..5130861b844 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -92,7 +92,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
@@ -1622,7 +1622,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         if (streamOutput.getPartitioner() instanceof ForwardPartitioner
                 && streamOutput.getConsumerParallelism()
                         != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
-            streamOutput.setPartitioner(new RescalePartitioner<>());
+            streamOutput.setPartitioner(new RebalancePartitioner<>());
         }
     }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5d5bba63af6..ff398d37d1c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -131,6 +131,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
@@ -1864,7 +1865,7 @@ public class StreamTaskTest extends TestLogger {
                                             ((SingleRecordWriter) recordWriterDelegate)
                                                     .getRecordWriter(0))
                                     .getChannelSelector()
-                            instanceof RescalePartitioner);
+                            instanceof RebalancePartitioner);
         }
     }
 


[flink] 02/05: Replace Java 11 call

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b1288e136cd73b3f5755283951ef414b870eccc
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Dec 5 16:14:18 2022 +0100

    Replace Java 11 call
---
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 30063b0a09e..5d5bba63af6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1836,7 +1836,7 @@ public class StreamTaskTest extends TestLogger {
 
             // Change consumer parallelism
             harness.streamTask.configuration.setVertexNonChainedOutputs(
-                    List.of(
+                    Arrays.asList(
                             new NonChainedOutput(
                                     false,
                                     0,


[flink] 01/05: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 61d6e78e1f6dff093e005c834e1a008f146c02b9
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Nov 28 11:37:24 2022 +0100

    [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes
    
    In case of parallelism changes to the JobGraph, as done via the AdaptiveScheduler
    or through providing JobVertexId overrides in PipelineOptions#PARALLELISM_OVERRIDES, the inner
    serialized PartitionStrategy of a StreamTask may not be suitable anymore.
    
    This is the case for the ForwardPartitioner strategy which uses a fixed local channel for
    transmitting data. Whenever the consumer parallelism doesn't match the local parallelism, we should
    be replacing it with the RebalancePartitioner.
---
 .../flink/runtime/dispatcher/Dispatcher.java       |  3 +-
 .../api/writer/ChannelSelectorRecordWriter.java    |  6 ++
 .../streaming/api/graph/NonChainedOutput.java      |  6 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 12 ++++
 .../runtime/tasks/StreamConfigChainer.java         |  8 ++-
 .../tasks/StreamTaskFinalCheckpointsTest.java      |  4 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 17 ++++--
 .../streaming/runtime/tasks/StreamTaskTest.java    | 69 ++++++++++++++++++++++
 8 files changed, 115 insertions(+), 10 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c733af51334..cfa36bad1df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -1342,11 +1342,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
         for (JobVertex vertex : jobGraph.getVertices()) {
             String override = overrides.get(vertex.getID().toHexString());
             if (override != null) {
+                int currentParallelism = vertex.getParallelism();
                 int overrideParallelism = Integer.parseInt(override);
                 log.info(
                         "Changing job vertex {} parallelism from {} to {}",
                         vertex.getID(),
-                        vertex.getParallelism(),
+                        currentParallelism,
                         overrideParallelism);
                 vertex.setParallelism(overrideParallelism);
             }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 07181bd01f1..5b756b693b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
 
 import java.io.IOException;
@@ -71,4 +72,9 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable>
             flushAll();
         }
     }
+
+    @VisibleForTesting
+    public ChannelSelector<T> getChannelSelector() {
+        return channelSelector;
+    }
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
index 20d357c14e4..1042b396276 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
@@ -59,7 +59,7 @@ public class NonChainedOutput implements Serializable {
     private final OutputTag<?> outputTag;
 
     /** The corresponding data partitioner. */
-    private final StreamPartitioner<?> partitioner;
+    private StreamPartitioner<?> partitioner;
 
     /** Target {@link ResultPartitionType}. */
     private final ResultPartitionType partitionType;
@@ -119,6 +119,10 @@ public class NonChainedOutput implements Serializable {
         return outputTag;
     }
 
+    public void setPartitioner(StreamPartitioner<?> partitioner) {
+        this.partitioner = partitioner;
+    }
+
     public StreamPartitioner<?> getPartitioner() {
         return partitioner;
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index dc6a9dff6e4..16666b71804 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -91,6 +91,8 @@ import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
 import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
@@ -1603,6 +1605,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
         int index = 0;
         for (NonChainedOutput streamOutput : outputsInOrder) {
+            replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, streamOutput);
             recordWriters.add(
                     createRecordWriter(
                             streamOutput,
@@ -1614,6 +1617,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         return recordWriters;
     }
 
+    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+            Environment environment, NonChainedOutput streamOutput) {
+        if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+                && streamOutput.getConsumerParallelism()
+                        != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+            streamOutput.setPartitioner(new RescalePartitioner<>());
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
             NonChainedOutput streamOutput,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 37520e0515c..100340a6ac5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -203,6 +204,11 @@ public class StreamConfigChainer<OWNER> {
     }
 
     public <OUT> OWNER finishForSingletonOperatorChain(TypeSerializer<OUT> outputSerializer) {
+        return finishForSingletonOperatorChain(outputSerializer, new BroadcastPartitioner<>());
+    }
+
+    public <OUT> OWNER finishForSingletonOperatorChain(
+            TypeSerializer<OUT> outputSerializer, StreamPartitioner<?> partitioner) {
 
         checkState(chainIndex == 0, "Use finishForSingletonOperatorChain");
         checkState(headConfig == tailConfig);
@@ -231,7 +237,7 @@ public class StreamConfigChainer<OWNER> {
                             false,
                             new IntermediateDataSetID(),
                             null,
-                            new BroadcastPartitioner<>(),
+                            partitioner,
                             ResultPartitionType.PIPELINED_BOUNDED));
         }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 08ab5d99e60..637784d132f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.CompletingCheckpointResponder;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -125,7 +126,8 @@ public class StreamTaskFinalCheckpointsTest {
                             .addInput(STRING_TYPE_INFO)
                             .addAdditionalOutput(partitionWriters)
                             .setupOperatorChain(new EmptyOperator())
-                            .finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                            .finishForSingletonOperatorChain(
+                                    StringSerializer.INSTANCE, new BroadcastPartitioner<>())
                             .build()) {
                 testHarness.endInput();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 7fb875ada35..dcf9ffb3b8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.util.function.FunctionWithException;
 
@@ -108,6 +109,8 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
     private Function<SingleInputGateBuilder, SingleInputGateBuilder> modifyGateBuilder =
             Function.identity();
 
+    private StreamPartitioner<?> partitioner = new BroadcastPartitioner<>();
+
     public StreamTaskMailboxTestHarnessBuilder(
             FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory,
             TypeInformation<OUT> outputType) {
@@ -324,11 +327,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
                         0, null, null, (StreamOperator<?>) null, null, SourceStreamTask.class);
         StreamEdge streamEdge =
                 new StreamEdge(
-                        sourceVertexDummy,
-                        targetVertexDummy,
-                        gateIndex + 1,
-                        new BroadcastPartitioner<>(),
-                        null);
+                        sourceVertexDummy, targetVertexDummy, gateIndex + 1, partitioner, null);
 
         inPhysicalEdges.add(streamEdge);
         streamMockEnvironment.addInputGate(inputGates[gateIndex].getInputGate());
@@ -415,7 +414,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
             StreamOperatorFactory<?> factory, OperatorID operatorID) {
         checkState(!setupCalled, "This harness was already setup.");
         return setupOperatorChain(operatorID, factory)
-                .finishForSingletonOperatorChain(outputSerializer);
+                .finishForSingletonOperatorChain(outputSerializer, partitioner);
     }
 
     public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(
@@ -462,6 +461,12 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         return this;
     }
 
+    public StreamTaskMailboxTestHarnessBuilder<OUT> setOutputPartitioner(
+            StreamPartitioner partitioner) {
+        this.partitioner = partitioner;
+        return this;
+    }
+
     /**
      * A place holder representation of a {@link SourceInputConfig}. When building the test harness
      * it is replaced with {@link SourceInputConfig}.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9f5bbf227e9..30063b0a09e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
@@ -53,10 +54,15 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.metrics.TimerGauge;
@@ -65,6 +71,7 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -109,6 +116,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -122,6 +130,8 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.streaming.util.MockStreamConfig;
@@ -132,6 +142,7 @@ import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -1799,6 +1810,64 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testForwardPartitionerIsConvertedToRebalanceOnParallelismChanges()
+            throws Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setOutputPartitioner(new ForwardPartitioner<>())
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator());
+
+        try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+
+            RecordWriterDelegate<SerializationDelegate<StreamRecord<Object>>> recordWriterDelegate =
+                    harness.streamTask.createRecordWriterDelegate(
+                            harness.streamTask.configuration, harness.streamMockEnvironment);
+            // Prerequisite: We are using the ForwardPartitioner
+            assertTrue(
+                    ((ChannelSelectorRecordWriter)
+                                            ((SingleRecordWriter) recordWriterDelegate)
+                                                    .getRecordWriter(0))
+                                    .getChannelSelector()
+                            instanceof ForwardPartitioner);
+
+            // Change consumer parallelism
+            harness.streamTask.configuration.setVertexNonChainedOutputs(
+                    List.of(
+                            new NonChainedOutput(
+                                    false,
+                                    0,
+                                    // Set a different consumer parallelism to force trigger
+                                    // replacing the ForwardPartitioner
+                                    42,
+                                    100,
+                                    1000,
+                                    false,
+                                    new IntermediateDataSetID(),
+                                    new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
+                                    // Use forward partitioner
+                                    new ForwardPartitioner<>(),
+                                    ResultPartitionType.PIPELINED)));
+            harness.streamTask.configuration.serializeAllConfigs();
+
+            // Re-create outputs
+            recordWriterDelegate =
+                    harness.streamTask.createRecordWriterDelegate(
+                            harness.streamTask.configuration, harness.streamMockEnvironment);
+            // We should now have a RescalePartitioner to distribute the load
+            // for the non-matching downstream parallelism
+            assertTrue(
+                    ((ChannelSelectorRecordWriter)
+                                            ((SingleRecordWriter) recordWriterDelegate)
+                                                    .getRecordWriter(0))
+                                    .getChannelSelector()
+                            instanceof RescalePartitioner);
+        }
+    }
+
     private int getCurrentBufferSize(InputGate inputGate) {
         return getTestChannel(inputGate, 0).getCurrentBufferSize();
     }


[flink] 04/05: PR comment

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 41518d6b5e03eeb7b30b0e60c89f4dcdfdcb814c
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Dec 15 17:30:07 2022 +0100

    PR comment
    
    Co-authored-by: Rui Fan <19...@gmail.com>
---
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index ff398d37d1c..35261d449f2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1858,7 +1858,7 @@ public class StreamTaskTest extends TestLogger {
             recordWriterDelegate =
                     harness.streamTask.createRecordWriterDelegate(
                             harness.streamTask.configuration, harness.streamMockEnvironment);
-            // We should now have a RescalePartitioner to distribute the load
+            // We should now have a RebalancePartitioner to distribute the load
             // for the non-matching downstream parallelism
             assertTrue(
                     ((ChannelSelectorRecordWriter)