You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/07/07 08:14:45 UTC

[flink] branch master updated (4f57d02ddf1 -> 574ffa4219d)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 4f57d02ddf1 [FLINK-26813[sql-parser] Supports ADD/MODIFY column/watermark/constraint syntax for ALTER TABLE (#19193)
     new 2656f95cbaf [hotfix] Remove unused methods in StreamGraph
     new 574ffa4219d [FLINK-28357][datastream] Disallow null elements in StreamNode#typeSerializersIn

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/streaming/api/graph/StreamGraph.java     |  12 --
 .../flink/streaming/api/graph/StreamNode.java      |  13 +-
 .../datastream/FinishedSourcesWatermarkITCase.java | 141 +++++++++++++++++++++
 3 files changed, 149 insertions(+), 17 deletions(-)
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase.java


[flink] 02/02: [FLINK-28357][datastream] Disallow null elements in StreamNode#typeSerializersIn

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 574ffa4219dc8853100405af8fbdf55b99627cba
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 4 18:26:55 2022 +0200

    [FLINK-28357][datastream] Disallow null elements in StreamNode#typeSerializersIn
    
    Otherwise tasks can not correctly determine number of inputs. This was causing an
    issue where restored as finished OneInputStreamTask was waiting for MAX_WATERMARK
    from two inputs, where the second one was null.
    
    The problem was that {{FinishedOnRestoreInput#FinishedOnRestoreInput}} was being
    constructed with wrong number of inputs, because of some accidental {{null}}
    passed from the {{StreamGraphGenerator}}.
---
 .../flink/streaming/api/graph/StreamNode.java      |   9 +-
 .../datastream/FinishedSourcesWatermarkITCase.java | 141 +++++++++++++++++++++
 2 files changed, 149 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 4e400b8485f..1fcae5b9136 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -265,7 +266,13 @@ public class StreamNode {
 
     public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {
         checkArgument(typeSerializersIn.length > 0);
-        this.typeSerializersIn = typeSerializersIn;
+        // Unfortunately code above assumes type serializer can be null, while users of for example
+        // getTypeSerializersIn would be confused by returning an array size of two with all
+        // elements set to null...
+        this.typeSerializersIn =
+                Arrays.stream(typeSerializersIn)
+                        .filter(typeSerializer -> typeSerializer != null)
+                        .toArray(TypeSerializer<?>[]::new);
     }
 
     public TypeSerializer<?>[] getTypeSerializersIn() {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase.java
new file mode 100644
index 00000000000..3df4350ac02
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/FinishedSourcesWatermarkITCase.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test that ensures watermarks are correctly propagating with finished sources. This test has one
+ * short living source that finishes immediately. Then after 5th checkpoint job fails over, and then
+ * it makes sure that the watermark emitted from the other still working source around checkpoint
+ * 10, is reaching the sink. Only once this happens, the long living source is allowed to exit. If
+ * the watermark is not propagated/silently swallowed (as for example in FLINK-28357), the test is
+ * expected to livelock.
+ */
+public class FinishedSourcesWatermarkITCase extends TestLogger {
+
+    private static final AtomicLong CHECKPOINT_10_WATERMARK =
+            new AtomicLong(Watermark.MAX_WATERMARK.getTimestamp());
+    private static final AtomicBoolean DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK = new AtomicBoolean();
+
+    @Test
+    public void testTwoConsecutiveFinishedTasksShouldPropagateMaxWatermark() throws Exception {
+        Configuration conf = new Configuration();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+        // disable chaining to make sure we will have two consecutive checkpoints with Task ==
+        // FINISHED
+        env.disableOperatorChaining();
+        // Make sure that the short living source has plenty of time to finish before the 5th
+        // checkpoint
+        env.enableCheckpointing(200);
+
+        // create our sources - one that will want to run forever, and another that finishes
+        // immediately
+        DataStream<String> runningStreamIn =
+                env.addSource(new LongRunningSource(), "Long Running Source");
+        DataStream<String> emptyStream =
+                env.addSource(new ShortLivedEmptySource(), "Short Lived Source");
+
+        // pass the empty stream through a simple map() function
+        DataStream<String> mappedEmptyStream = emptyStream.map(v -> v).name("Empty Stream Map");
+
+        // join the two streams together to see what watermark is reached during startup and after a
+        // recovery
+        runningStreamIn
+                .connect(mappedEmptyStream)
+                .process(new NoopCoProcessFunction())
+                .name("Join")
+                .addSink(new SinkWaitingForWatermark());
+
+        env.execute();
+    }
+
+    private static class SinkWaitingForWatermark implements SinkFunction<String> {
+        @Override
+        public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark) {
+            if (watermark.getTimestamp() > CHECKPOINT_10_WATERMARK.get()) {
+                DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.set(true);
+            }
+        }
+    }
+
+    private static class LongRunningSource extends RichSourceFunction<String>
+            implements CheckpointListener {
+        private volatile boolean isRunning = true;
+        private long lastEmittedWatermark;
+
+        @Override
+        public void run(SourceContext<String> sourceContext) throws Exception {
+            while (isRunning && !DOWNSTREAM_CHECKPOINT_10_WATERMARK_ACK.get()) {
+                synchronized (sourceContext.getCheckpointLock()) {
+                    lastEmittedWatermark =
+                            Math.max(System.currentTimeMillis(), lastEmittedWatermark);
+                    sourceContext.emitWatermark(new Watermark(lastEmittedWatermark));
+                }
+                Thread.sleep(1);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isRunning = false;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            if (checkpointId == 5) {
+                throw new RuntimeException("Force recovery");
+            }
+            if (checkpointId > 10) {
+                CHECKPOINT_10_WATERMARK.set(
+                        Math.min(lastEmittedWatermark, CHECKPOINT_10_WATERMARK.get()));
+            }
+        }
+    }
+
+    private static class ShortLivedEmptySource extends RichSourceFunction<String> {
+        @Override
+        public void run(SourceContext<String> sourceContext) throws Exception {}
+
+        public void cancel() {}
+    }
+
+    private static class NoopCoProcessFunction extends CoProcessFunction<String, String, String> {
+        @Override
+        public void processElement1(String val, Context context, Collector<String> collector) {}
+
+        @Override
+        public void processElement2(String val, Context context, Collector<String> collector) {}
+    }
+}


[flink] 01/02: [hotfix] Remove unused methods in StreamGraph

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2656f95cbaf574ca09f40c5271684f5cb1e9a0c2
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 4 17:44:45 2022 +0200

    [hotfix] Remove unused methods in StreamGraph
---
 .../org/apache/flink/streaming/api/graph/StreamGraph.java    | 12 ------------
 .../org/apache/flink/streaming/api/graph/StreamNode.java     |  4 ----
 2 files changed, 16 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7f06c6182be..da721ce8b7f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -811,18 +811,6 @@ public class StreamGraph implements Pipeline {
         vertex.setSerializerOut(out);
     }
 
-    public void setSerializersFrom(Integer from, Integer to) {
-        StreamNode fromVertex = getStreamNode(from);
-        StreamNode toVertex = getStreamNode(to);
-
-        toVertex.setSerializersIn(fromVertex.getTypeSerializerOut());
-        toVertex.setSerializerOut(fromVertex.getTypeSerializerIn(0));
-    }
-
-    public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
-        getStreamNode(vertexID).setSerializerOut(outType.createSerializer(executionConfig));
-    }
-
     public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
         getStreamNode(vertexID).setInputFormat(inputFormat);
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index cac1cd5c97e..4e400b8485f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -272,10 +272,6 @@ public class StreamNode {
         return typeSerializersIn;
     }
 
-    public TypeSerializer<?> getTypeSerializerIn(int index) {
-        return typeSerializersIn[index];
-    }
-
     public TypeSerializer<?> getTypeSerializerOut() {
         return typeSerializerOut;
     }