You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2023/01/12 15:23:59 UTC
[flink] branch master updated: [FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fb482fe3984 [FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement
fb482fe3984 is described below
commit fb482fe39844efda33a4c05858903f5b64e158a3
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Jan 3 11:49:58 2023 +0100
[FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement
---
.../flink/streaming/runtime/tasks/StreamTask.java | 10 +++++--
.../runtime/tasks/StreamMockEnvironment.java | 6 +++-
.../streaming/runtime/tasks/StreamTaskTest.java | 34 +++++++---------------
3 files changed, 23 insertions(+), 27 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 097be6d07f2..40ad6a1fceb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -1610,7 +1610,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
int index = 0;
for (NonChainedOutput streamOutput : outputsInOrder) {
- replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, streamOutput);
+ replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+ environment, streamOutput, index);
recordWriters.add(
createRecordWriter(
streamOutput,
@@ -1623,10 +1624,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
- Environment environment, NonChainedOutput streamOutput) {
+ Environment environment, NonChainedOutput streamOutput, int outputIndex) {
if (streamOutput.getPartitioner() instanceof ForwardPartitioner
- && streamOutput.getConsumerParallelism()
+ && environment.getWriter(outputIndex).getNumberOfSubpartitions()
!= environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+ LOG.debug(
+ "Replacing forward partitioner with rebalance for {}",
+ environment.getTaskInfo().getTaskNameWithSubtasks());
streamOutput.setPartitioner(new RebalancePartitioner<>());
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 567af50d5b1..9d37bc16cf8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -95,7 +95,7 @@ public class StreamMockEnvironment implements Environment {
private final List<IndexedInputGate> inputs;
- private final List<ResultPartitionWriter> outputs;
+ private List<ResultPartitionWriter> outputs;
private final JobID jobID;
@@ -236,6 +236,10 @@ public class StreamMockEnvironment implements Environment {
}
}
+ public void setOutputs(List<ResultPartitionWriter> outputs) {
+ this.outputs = outputs;
+ }
+
public void setExternalExceptionHandler(Consumer<Throwable> externalExceptionHandler) {
this.externalExceptionHandler = externalExceptionHandler;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 51f3d0d2311..4219810e081 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -58,11 +57,10 @@ import org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWrite
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.metrics.TimerGauge;
@@ -117,7 +115,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -143,7 +140,6 @@ import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -1835,24 +1831,16 @@ public class StreamTaskTest extends TestLogger {
.getChannelSelector()
instanceof ForwardPartitioner);
- // Change consumer parallelism
- harness.streamTask.configuration.setVertexNonChainedOutputs(
- Arrays.asList(
- new NonChainedOutput(
- false,
- 0,
- // Set a different consumer parallelism to force trigger
- // replacing the ForwardPartitioner
- 42,
- 100,
- 1000,
- false,
- new IntermediateDataSetID(),
- new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
- // Use forward partitioner
- new ForwardPartitioner<>(),
- ResultPartitionType.PIPELINED)));
- harness.streamTask.configuration.serializeAllConfigs();
+ // Simulate changed downstream task parallelism (1->2)
+ List<ResultPartitionWriter> newOutputs = new ArrayList<>();
+ newOutputs.add(
+ new MockResultPartitionWriter() {
+ @Override
+ public int getNumberOfSubpartitions() {
+ return 2;
+ }
+ });
+ harness.streamMockEnvironment.setOutputs(newOutputs);
// Re-create outputs
recordWriterDelegate =