You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/02 13:04:17 UTC
[flink] branch master updated (ded2df542fd -> 1c2f4eb4ba1)
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
from ded2df542fd [FLINK-29363] allow fully redirection in web dashboard
new 61d6e78e1f6 [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes
new 3b1288e136c Replace Java 11 call
new d20533e5877 Use RebalancePartitioner
new 41518d6b5e0 PR comment
new 1c2f4eb4ba1 Update flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../flink/runtime/dispatcher/Dispatcher.java | 3 +-
.../api/writer/ChannelSelectorRecordWriter.java | 6 ++
.../streaming/api/graph/NonChainedOutput.java | 6 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 12 ++++
.../runtime/tasks/StreamConfigChainer.java | 8 ++-
.../tasks/StreamTaskFinalCheckpointsTest.java | 4 +-
.../tasks/StreamTaskMailboxTestHarnessBuilder.java | 17 ++++--
.../streaming/runtime/tasks/StreamTaskTest.java | 69 ++++++++++++++++++++++
8 files changed, 115 insertions(+), 10 deletions(-)
[flink] 05/05: Update flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1c2f4eb4ba1d1b5ad403d7991170ca16cb71df56
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Dec 20 12:54:26 2022 +0100
Update flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
Co-authored-by: Rui Fan <19...@gmail.com>
---
.../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 35261d449f2..38523c18431 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -132,7 +132,6 @@ import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.MockStreamConfig;
[flink] 03/05: Use RebalancePartitioner
Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d20533e58770c276e7fc2ac0029dbf61ecf0b19e
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Dec 15 15:46:15 2022 +0100
Use RebalancePartitioner
---
.../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 4 ++--
.../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 3 ++-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 16666b71804..5130861b844 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -92,7 +92,7 @@ import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
@@ -1622,7 +1622,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (streamOutput.getPartitioner() instanceof ForwardPartitioner
&& streamOutput.getConsumerParallelism()
!= environment.getTaskInfo().getNumberOfParallelSubtasks()) {
- streamOutput.setPartitioner(new RescalePartitioner<>());
+ streamOutput.setPartitioner(new RebalancePartitioner<>());
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5d5bba63af6..ff398d37d1c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -131,6 +131,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
@@ -1864,7 +1865,7 @@ public class StreamTaskTest extends TestLogger {
((SingleRecordWriter) recordWriterDelegate)
.getRecordWriter(0))
.getChannelSelector()
- instanceof RescalePartitioner);
+ instanceof RebalancePartitioner);
}
}
[flink] 02/05: Replace Java 11 call
Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b1288e136cd73b3f5755283951ef414b870eccc
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Dec 5 16:14:18 2022 +0100
Replace Java 11 call
---
.../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 30063b0a09e..5d5bba63af6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1836,7 +1836,7 @@ public class StreamTaskTest extends TestLogger {
// Change consumer parallelism
harness.streamTask.configuration.setVertexNonChainedOutputs(
- List.of(
+ Arrays.asList(
new NonChainedOutput(
false,
0,
[flink] 01/05: [FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes
Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 61d6e78e1f6dff093e005c834e1a008f146c02b9
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Nov 28 11:37:24 2022 +0100
[FLINK-30213] Change ForwardPartitioner to RebalancePartitioner on parallelism changes
In case of parallelism changes to the JobGraph, as done via the AdaptiveScheduler
or through providing JobVertexId overrides in PipelineOptions#PARALLELISM_OVERRIDES, the inner
serialized PartitionStrategy of a StreamTask may not be suitable anymore.
This is the case for the ForwardPartitioner strategy which uses a fixed local channel for
transmitting data. Whenever the consumer parallelism doesn't match the local parallelism, we should
be replacing it with the RebalancePartitioner.
---
.../flink/runtime/dispatcher/Dispatcher.java | 3 +-
.../api/writer/ChannelSelectorRecordWriter.java | 6 ++
.../streaming/api/graph/NonChainedOutput.java | 6 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 12 ++++
.../runtime/tasks/StreamConfigChainer.java | 8 ++-
.../tasks/StreamTaskFinalCheckpointsTest.java | 4 +-
.../tasks/StreamTaskMailboxTestHarnessBuilder.java | 17 ++++--
.../streaming/runtime/tasks/StreamTaskTest.java | 69 ++++++++++++++++++++++
8 files changed, 115 insertions(+), 10 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c733af51334..cfa36bad1df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -1342,11 +1342,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId>
for (JobVertex vertex : jobGraph.getVertices()) {
String override = overrides.get(vertex.getID().toHexString());
if (override != null) {
+ int currentParallelism = vertex.getParallelism();
int overrideParallelism = Integer.parseInt(override);
log.info(
"Changing job vertex {} parallelism from {} to {}",
vertex.getID(),
- vertex.getParallelism(),
+ currentParallelism,
overrideParallelism);
vertex.setParallelism(overrideParallelism);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 07181bd01f1..5b756b693b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.api.writer;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
import java.io.IOException;
@@ -71,4 +72,9 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable>
flushAll();
}
}
+
+ @VisibleForTesting
+ public ChannelSelector<T> getChannelSelector() {
+ return channelSelector;
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
index 20d357c14e4..1042b396276 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/NonChainedOutput.java
@@ -59,7 +59,7 @@ public class NonChainedOutput implements Serializable {
private final OutputTag<?> outputTag;
/** The corresponding data partitioner. */
- private final StreamPartitioner<?> partitioner;
+ private StreamPartitioner<?> partitioner;
/** Target {@link ResultPartitionType}. */
private final ResultPartitionType partitionType;
@@ -119,6 +119,10 @@ public class NonChainedOutput implements Serializable {
return outputTag;
}
+ public void setPartitioner(StreamPartitioner<?> partitioner) {
+ this.partitioner = partitioner;
+ }
+
public StreamPartitioner<?> getPartitioner() {
return partitioner;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index dc6a9dff6e4..16666b71804 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -91,6 +91,8 @@ import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
@@ -1603,6 +1605,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
int index = 0;
for (NonChainedOutput streamOutput : outputsInOrder) {
+ replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, streamOutput);
recordWriters.add(
createRecordWriter(
streamOutput,
@@ -1614,6 +1617,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return recordWriters;
}
+ private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+ Environment environment, NonChainedOutput streamOutput) {
+ if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+ && streamOutput.getConsumerParallelism()
+ != environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+ streamOutput.setPartitioner(new RescalePartitioner<>());
+ }
+ }
+
@SuppressWarnings("unchecked")
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(
NonChainedOutput streamOutput,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index 37520e0515c..100340a6ac5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import java.util.Collections;
import java.util.HashMap;
@@ -203,6 +204,11 @@ public class StreamConfigChainer<OWNER> {
}
public <OUT> OWNER finishForSingletonOperatorChain(TypeSerializer<OUT> outputSerializer) {
+ return finishForSingletonOperatorChain(outputSerializer, new BroadcastPartitioner<>());
+ }
+
+ public <OUT> OWNER finishForSingletonOperatorChain(
+ TypeSerializer<OUT> outputSerializer, StreamPartitioner<?> partitioner) {
checkState(chainIndex == 0, "Use finishForSingletonOperatorChain");
checkState(headConfig == tailConfig);
@@ -231,7 +237,7 @@ public class StreamConfigChainer<OWNER> {
false,
new IntermediateDataSetID(),
null,
- new BroadcastPartitioner<>(),
+ partitioner,
ResultPartitionType.PIPELINED_BOUNDED));
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 08ab5d99e60..637784d132f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.util.FlinkRuntimeException;
@@ -125,7 +126,8 @@ public class StreamTaskFinalCheckpointsTest {
.addInput(STRING_TYPE_INFO)
.addAdditionalOutput(partitionWriters)
.setupOperatorChain(new EmptyOperator())
- .finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+ .finishForSingletonOperatorChain(
+ StringSerializer.INSTANCE, new BroadcastPartitioner<>())
.build()) {
testHarness.endInput();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 7fb875ada35..dcf9ffb3b8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -58,6 +58,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.function.FunctionWithException;
@@ -108,6 +109,8 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
private Function<SingleInputGateBuilder, SingleInputGateBuilder> modifyGateBuilder =
Function.identity();
+ private StreamPartitioner<?> partitioner = new BroadcastPartitioner<>();
+
public StreamTaskMailboxTestHarnessBuilder(
FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory,
TypeInformation<OUT> outputType) {
@@ -324,11 +327,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
0, null, null, (StreamOperator<?>) null, null, SourceStreamTask.class);
StreamEdge streamEdge =
new StreamEdge(
- sourceVertexDummy,
- targetVertexDummy,
- gateIndex + 1,
- new BroadcastPartitioner<>(),
- null);
+ sourceVertexDummy, targetVertexDummy, gateIndex + 1, partitioner, null);
inPhysicalEdges.add(streamEdge);
streamMockEnvironment.addInputGate(inputGates[gateIndex].getInputGate());
@@ -415,7 +414,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
StreamOperatorFactory<?> factory, OperatorID operatorID) {
checkState(!setupCalled, "This harness was already setup.");
return setupOperatorChain(operatorID, factory)
- .finishForSingletonOperatorChain(outputSerializer);
+ .finishForSingletonOperatorChain(outputSerializer, partitioner);
}
public StreamConfigChainer<StreamTaskMailboxTestHarnessBuilder<OUT>> setupOperatorChain(
@@ -462,6 +461,12 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
return this;
}
+ public StreamTaskMailboxTestHarnessBuilder<OUT> setOutputPartitioner(
+ StreamPartitioner partitioner) {
+ this.partitioner = partitioner;
+ return this;
+ }
+
/**
* A place holder representation of a {@link SourceInputConfig}. When building the test harness
* it is replaced with {@link SourceInputConfig}.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9f5bbf227e9..30063b0a09e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -53,10 +54,15 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.AvailabilityTestResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.metrics.TimerGauge;
@@ -65,6 +71,7 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -109,6 +116,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -122,6 +130,8 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.MockStreamConfig;
@@ -132,6 +142,7 @@ import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -1799,6 +1810,64 @@ public class StreamTaskTest extends TestLogger {
}
}
+ @Test
+ public void testForwardPartitionerIsConvertedToRebalanceOnParallelismChanges()
+ throws Exception {
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO)
+ .setOutputPartitioner(new ForwardPartitioner<>())
+ .setupOutputForSingletonOperatorChain(
+ new TestBoundedOneInputStreamOperator());
+
+ try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+
+ RecordWriterDelegate<SerializationDelegate<StreamRecord<Object>>> recordWriterDelegate =
+ harness.streamTask.createRecordWriterDelegate(
+ harness.streamTask.configuration, harness.streamMockEnvironment);
+ // Prerequisite: We are using the ForwardPartitioner
+ assertTrue(
+ ((ChannelSelectorRecordWriter)
+ ((SingleRecordWriter) recordWriterDelegate)
+ .getRecordWriter(0))
+ .getChannelSelector()
+ instanceof ForwardPartitioner);
+
+ // Change consumer parallelism
+ harness.streamTask.configuration.setVertexNonChainedOutputs(
+ List.of(
+ new NonChainedOutput(
+ false,
+ 0,
+ // Set a different consumer parallelism to force trigger
+ // replacing the ForwardPartitioner
+ 42,
+ 100,
+ 1000,
+ false,
+ new IntermediateDataSetID(),
+ new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
+ // Use forward partitioner
+ new ForwardPartitioner<>(),
+ ResultPartitionType.PIPELINED)));
+ harness.streamTask.configuration.serializeAllConfigs();
+
+ // Re-create outputs
+ recordWriterDelegate =
+ harness.streamTask.createRecordWriterDelegate(
+ harness.streamTask.configuration, harness.streamMockEnvironment);
+ // We should now have a RescalePartitioner to distribute the load
+ // for the non-matching downstream parallelism
+ assertTrue(
+ ((ChannelSelectorRecordWriter)
+ ((SingleRecordWriter) recordWriterDelegate)
+ .getRecordWriter(0))
+ .getChannelSelector()
+ instanceof RescalePartitioner);
+ }
+ }
+
private int getCurrentBufferSize(InputGate inputGate) {
return getTestChannel(inputGate, 0).getCurrentBufferSize();
}
[flink] 04/05: PR comment
Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 41518d6b5e03eeb7b30b0e60c89f4dcdfdcb814c
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Dec 15 17:30:07 2022 +0100
PR comment
Co-authored-by: Rui Fan <19...@gmail.com>
---
.../java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index ff398d37d1c..35261d449f2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1858,7 +1858,7 @@ public class StreamTaskTest extends TestLogger {
recordWriterDelegate =
harness.streamTask.createRecordWriterDelegate(
harness.streamTask.configuration, harness.streamMockEnvironment);
- // We should now have a RescalePartitioner to distribute the load
+ // We should now have a RebalancePartitioner to distribute the load
// for the non-matching downstream parallelism
assertTrue(
((ChannelSelectorRecordWriter)