You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 09:23:25 UTC

[flink] 02/02: [FLINK-29431] Exceptions during job graph serialization lock up client

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f66cb67687e7df355006630d50b81307d281c150
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 28 12:10:54 2022 +0800

    [FLINK-29431] Exceptions during job graph serialization lock up client
    
    This closes #20912
---
 .../flink/streaming/api/graph/StreamConfig.java    |  13 +--
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 105 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 57cb1bca1ba..479b2004580 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -164,10 +164,9 @@ public class StreamConfig implements Serializable {
         FutureUtils.combineAll(chainedTaskFutures.values())
                 .thenAcceptAsync(
                         chainedConfigs -> {
-                            // Serialize all the objects to config.
-                            serializeAllConfigs();
-
                             try {
+                                // Serialize all the objects to config.
+                                serializeAllConfigs();
                                 InstantiationUtil.writeObjectToConfig(
                                         chainedConfigs.stream()
                                                 .collect(
@@ -176,12 +175,10 @@ public class StreamConfig implements Serializable {
                                                                 Function.identity())),
                                         this.config,
                                         CHAINED_TASK_CONFIG);
-                            } catch (IOException e) {
-                                throw new StreamTaskException(
-                                        "Could not serialize object for key chained task config.",
-                                        e);
+                                serializationFuture.complete(this);
+                            } catch (Throwable throwable) {
+                                serializationFuture.completeExceptionally(throwable);
                             }
-                            serializationFuture.complete(this);
                         },
                         ioExecutor);
         return serializationFuture;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index dc534008158..23a481c4ff0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -75,9 +75,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
@@ -94,6 +96,7 @@ import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
@@ -109,6 +112,8 @@ import org.assertj.core.data.Offset;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1691,6 +1696,106 @@ class StreamingJobGraphGeneratorTest {
         assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
     }
 
+    @Test
+    void testStreamConfigSerializationException() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Integer> source = env.fromElements(1, 2, 3);
+        env.addOperator(
+                new OneInputTransformation<>(
+                        source.getTransformation(),
+                        "serializationTestOperator",
+                        // using a non-serializable operator factory to trigger an IOException when
+                        // try to serialize streamConfig.
+                        new SerializationTestOperatorFactory(false),
+                        Types.INT,
+                        1));
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(streamGraph))
+                .hasRootCauseInstanceOf(IOException.class)
+                .hasRootCauseMessage("This operator factory is not serializable.");
+    }
+
+    @Test
+    public void testCoordinatedSerializationException() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Integer> source = env.fromElements(1, 2, 3);
+        env.addOperator(
+                new OneInputTransformation<>(
+                        source.getTransformation(),
+                        "serializationTestOperator",
+                        new SerializationTestOperatorFactory(true),
+                        Types.INT,
+                        1));
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(streamGraph))
+                .hasRootCauseInstanceOf(IOException.class)
+                .hasRootCauseMessage("This provider is not serializable.");
+    }
+
+    private static class SerializationTestOperatorFactory
+            extends AbstractStreamOperatorFactory<Integer>
+            implements CoordinatedOperatorFactory<Integer> {
+        private final boolean isOperatorFactorySerializable;
+
+        SerializationTestOperatorFactory(boolean isOperatorFactorySerializable) {
+            this.isOperatorFactorySerializable = isOperatorFactorySerializable;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new NonSerializableCoordinatorProvider();
+        }
+
+        private void writeObject(ObjectOutputStream oos) throws IOException {
+            if (!isOperatorFactorySerializable) {
+                throw new IOException("This operator factory is not serializable.");
+            }
+        }
+
+        @Override
+        public <T extends StreamOperator<Integer>> T createStreamOperator(
+                StreamOperatorParameters<Integer> parameters) {
+            // today's lunch is generics spaghetti
+            @SuppressWarnings("unchecked")
+            final T castedOperator = (T) new SerializationTestOperator();
+            return castedOperator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+            return SerializationTestOperator.class;
+        }
+    }
+
+    private static class SerializationTestOperator extends AbstractStreamOperator<Integer>
+            implements OneInputStreamOperator<Integer, Integer> {
+
+        @Override
+        public void processElement(StreamRecord<Integer> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class NonSerializableCoordinatorProvider
+            implements OperatorCoordinator.Provider {
+
+        @Override
+        public OperatorID getOperatorId() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        private void writeObject(ObjectOutputStream oos) throws IOException {
+            throw new IOException("This provider is not serializable.");
+        }
+    }
+
     private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
         for (JobVertex jobVertex : vertices) {
             if (jobVertex.getName().contains(name)) {