You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 09:23:25 UTC
[flink] 02/02: [FLINK-29431] Exceptions during job graph serialization lock up client
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f66cb67687e7df355006630d50b81307d281c150
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 28 12:10:54 2022 +0800
[FLINK-29431] Exceptions during job graph serialization lock up client
This closes #20912
---
.../flink/streaming/api/graph/StreamConfig.java | 13 +--
.../api/graph/StreamingJobGraphGeneratorTest.java | 105 +++++++++++++++++++++
2 files changed, 110 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 57cb1bca1ba..479b2004580 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -164,10 +164,9 @@ public class StreamConfig implements Serializable {
FutureUtils.combineAll(chainedTaskFutures.values())
.thenAcceptAsync(
chainedConfigs -> {
- // Serialize all the objects to config.
- serializeAllConfigs();
-
try {
+ // Serialize all the objects to config.
+ serializeAllConfigs();
InstantiationUtil.writeObjectToConfig(
chainedConfigs.stream()
.collect(
@@ -176,12 +175,10 @@ public class StreamConfig implements Serializable {
Function.identity())),
this.config,
CHAINED_TASK_CONFIG);
- } catch (IOException e) {
- throw new StreamTaskException(
- "Could not serialize object for key chained task config.",
- e);
+ serializationFuture.complete(this);
+ } catch (Throwable throwable) {
+ serializationFuture.completeExceptionally(throwable);
}
- serializationFuture.complete(this);
},
ioExecutor);
return serializationFuture;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index dc534008158..23a481c4ff0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -75,9 +75,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
@@ -94,6 +96,7 @@ import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
@@ -109,6 +112,8 @@ import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1691,6 +1696,106 @@ class StreamingJobGraphGeneratorTest {
assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
}
+ @Test
+ void testStreamConfigSerializationException() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Integer> source = env.fromElements(1, 2, 3);
+ env.addOperator(
+ new OneInputTransformation<>(
+ source.getTransformation(),
+ "serializationTestOperator",
+ // using a non-serializable operator factory to trigger an IOException when
+ // try to serialize streamConfig.
+ new SerializationTestOperatorFactory(false),
+ Types.INT,
+ 1));
+ final StreamGraph streamGraph = env.getStreamGraph();
+ assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(streamGraph))
+ .hasRootCauseInstanceOf(IOException.class)
+ .hasRootCauseMessage("This operator factory is not serializable.");
+ }
+
+ @Test
+ public void testCoordinatedSerializationException() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Integer> source = env.fromElements(1, 2, 3);
+ env.addOperator(
+ new OneInputTransformation<>(
+ source.getTransformation(),
+ "serializationTestOperator",
+ new SerializationTestOperatorFactory(true),
+ Types.INT,
+ 1));
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(streamGraph))
+ .hasRootCauseInstanceOf(IOException.class)
+ .hasRootCauseMessage("This provider is not serializable.");
+ }
+
+ private static class SerializationTestOperatorFactory
+ extends AbstractStreamOperatorFactory<Integer>
+ implements CoordinatedOperatorFactory<Integer> {
+ private final boolean isOperatorFactorySerializable;
+
+ SerializationTestOperatorFactory(boolean isOperatorFactorySerializable) {
+ this.isOperatorFactorySerializable = isOperatorFactorySerializable;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new NonSerializableCoordinatorProvider();
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ if (!isOperatorFactorySerializable) {
+ throw new IOException("This operator factory is not serializable.");
+ }
+ }
+
+ @Override
+ public <T extends StreamOperator<Integer>> T createStreamOperator(
+ StreamOperatorParameters<Integer> parameters) {
+ // today's lunch is generics spaghetti
+ @SuppressWarnings("unchecked")
+ final T castedOperator = (T) new SerializationTestOperator();
+ return castedOperator;
+ }
+
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return SerializationTestOperator.class;
+ }
+ }
+
+ private static class SerializationTestOperator extends AbstractStreamOperator<Integer>
+ implements OneInputStreamOperator<Integer, Integer> {
+
+ @Override
+ public void processElement(StreamRecord<Integer> element) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class NonSerializableCoordinatorProvider
+ implements OperatorCoordinator.Provider {
+
+ @Override
+ public OperatorID getOperatorId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ throw new IOException("This provider is not serializable.");
+ }
+ }
+
private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
for (JobVertex jobVertex : vertices) {
if (jobVertex.getName().contains(name)) {